-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26572][SQL] fix aggregate codegen result evaluation #23731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Change-Id: Ie07c913fc4586296c8187f0972c19169da25f613
|
I think we should handle this case in a planner? Could you check again? |
|
btw, could you describe more in the PR description? what's the root cause of this issue? How did this pr fix the issue? brabrabra.... |
|
The reason why I think this is a code generation issue is that if you disable This is the physical plan of the example in the ticket: and if you take a look the code of stage 3 (left some comments in it regarding what my PR does): So both hash aggregate and broadcast join are required in one codegen stage to experience this issue and also important that aggregate has to be on the "stream" side. This might be a rare case and explains why this issue hasn't come up earlier. |
|
The changes makes sense to me, but I think this problem was introduced in SPARK-13404, which claimed to have a significant perf gain (about 30% on TPCDS Q55), so it would be great if we can fix this without introducing perf regression. @peter-toth may you please run (and post the results) the benchmarks in order to ensure we are not introducing a perf regression with this PR? @davies you are the author of that PR, do you have time to check this? |
|
ok to test |
|
This issue happens in case of stateful exprs only? If so, could you modify the code to apply the current fix only if |
| $evaluateKeyVars | ||
| $evaluateBufferVars | ||
| $evaluateAggResults | ||
| $evaluateResultVars |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. If you replace .distinct() to .groupBy("idx").max() in the example then this code path runs and the change fixes the same issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, could you please add test cases to cover all the code paths you added in this pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've added that path to the test.
|
Test build #102034 has finished for PR 23731 at commit
|
|
Here are my benchmark results of q55. I run 3 times on master and 3 times on this PR branch against This PR: Although the results are a bit varying, it seems this patch would introduce some performance degradation. |
|
@peter-toth did you run the benchmark also on the other queries? My guess is that it may also happen that q55 gets some perf degradation, but others improve. In that case we should kind of average over all the queries whether the impact is positive or not. In case we decide to limit this to be done only for some expressions, we should do it for those which aer non- |
|
Thanks @mgaido91, then I will run a full benchmark first. |
|
Thanks @peter-toth! |
|
Retest this please. |
|
Test build #102104 has finished for PR 23731 at commit
|
Shouldn't we fix join instead of aggregate? |
| consume(ctx, eval) | ||
| val evaluateResultVars = evaluateVariables(resultVars) | ||
| s""" | ||
| $evaluateResultVars |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non broadcast join cases, the change will force evaluation unnecessarily too. We should move evaluation out of the loop in broadcast join, if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I a bit concern about is; is it semantically ok to defer the evaluation of nondeterministic exprs if HashAggregateExec has these exprs?
I think, to fix this issue, its ok to modify code in the join side if we could find a simpler solution there with no performance regression. But, I have just a question about the design regardless of this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh.. Kris answered my question.. in #23731 (review)
|
@mgaido91 @maropu @cloud-fan @viirya I've just collected the results of full Here are the benchmark results if you are interested:
|
|
@cloud-fan @viirya I am not sure about fixing this in the join is a good idea. First of all we have many kind of joins, so likely we would need to impact all of them and there may be other operators which use loops other than joins. I don't think it is correct to delegate to the consumer the responsibility of computing variables if needed. It seems more reasonable to me to fix it in the aggregate honestly. |
|
@mgaido91 are you sure aggregate is the only one that produces unevaluated result expressions? IIRC this is a long-standing optimization in the whole stage codegen framework, and there is no such a rule that operators must evaluate the result expressions before calling also cc @rednaxelafx @kiszk |
|
cc @dbtsai since he is the release manager for 2.4.1. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This bug and fix touches a basic design area of Spark SQL's whole-stage codegen:
- Deterministic expressions can be evaluated anywhere as long as the inputs (data dependencies) are available, and are allowed to be evaluated multiple times (although from a performance point of view it's not preferred to evaluate them repeatedly); non-deterministic expressions has to be only evaluated once, and the order of evaluation should respect the order in the original query.
Two rules of thumb are:
- In the whole-stage codegen framework, the evaluation of a deterministic expression can be deferred to just before its result is used. To improve performance and reduce code size, we only expect output expressions that are used more than once to be eagerly evaluated. This "used more than once" is expressed by
CodegenSupport.usedInputs, andCodegenSupport.consume()handles the eager evaluation of such expressions automatically. That's #11274 already mentioned in one of the comments above. - Any physical plan operator that carries an output projection list, such as
ProjectExecand in this caseHashAggregateExechas to perform special treatment of forcing evaluation of non-deterministic expressions before passing the outputVars toconsume(), to make sure the side effects are emitted in the correct order and not evaluated repeatedly in the parents'doConsume(). SeeProjectExec.doConsume()for an example of what this special treatment should look like.
Note that Stateful expressions are Nondeterministic by design; the latter covers more expressions than the former.
The reason why this special treatment isn't done in the CodegenSupport.consume() framework function is because: consume() only gets to see the outputVars from the child as a list of ExprCodes but not the list of Expressions that produced the code. The former has lost the notion of whether the generated code is deterministic or not, which can only be found on the latter.
consume() also gets to see the child.outputs but that's a list of Attributes, which doesn't have the knowledge of whether or not the original expression was deterministic. So that doesn't help.
With that, we'd have to perform the special treatment before calling consume().
This brings us to another related note: in the whole-stage codegen world, it really is preferred to host non-trivial expressions in ProjectExec as much as possible, so that we'd only have to non-trivial expression handling in one place. Fusing the output projection list in a fat operator is a design from the past -- it would have helped reduce the operator boundaries and thus reduce materialization/operator dispatch overhead in the Volcano model, but in the whole-stage codegen world that doesn't matter at all.
Here's my suggested fix for HashAggregateExec:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 19a47ffc6d..be457b435b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -154,6 +154,14 @@ case class HashAggregateExec(
child.asInstanceOf[CodegenSupport].inputRDDs()
}
+ // Extract the code to evaluate non-deterministic expressions in the resultExpressions.
+ // NOTE: this function will mutate the state of the `ExprCode`s in `resultVars`: the `code` of
+ // non-deterministic expressions will be cleared.
+ private def evaluateNondeterministicResults(resultVars: Seq[ExprCode]): String = {
+ val nondeterministicAttrs = resultExpressions.filterNot(_.deterministic).map(_.toAttribute)
+ evaluateRequiredVariables(output, resultVars, AttributeSet(nondeterministicAttrs))
+ }
+
protected override def doProduce(ctx: CodegenContext): String = {
if (groupingExpressions.isEmpty) {
doProduceWithoutKeys(ctx)
@@ -208,8 +216,10 @@ case class HashAggregateExec(
// evaluate result expressions
ctx.currentVars = aggResults
val resultVars = bindReferences(resultExpressions, aggregateAttributes).map(_.genCode(ctx))
+ val evaluateNondeterministicAggResults = evaluateNondeterministicResults(resultVars)
(resultVars, s"""
|$evaluateAggResults
+ |$evaluateNondeterministicAggResults
|${evaluateVariables(resultVars)}
""".stripMargin)
} else if (modes.contains(Partial) || modes.contains(PartialMerge)) {
@@ -466,10 +476,12 @@ case class HashAggregateExec(
val resultVars = bindReferences[Expression](
resultExpressions,
inputAttrs).map(_.genCode(ctx))
+ val evaluateNondeterministicAggResults = evaluateNondeterministicResults(resultVars)
s"""
$evaluateKeyVars
$evaluateBufferVars
$evaluateAggResults
+ $evaluateNondeterministicAggResults
${consume(ctx, resultVars)}
"""
} else if (modes.contains(Partial) || modes.contains(PartialMerge)) {
@@ -506,10 +518,14 @@ case class HashAggregateExec(
// generate result based on grouping key
ctx.INPUT_ROW = keyTerm
ctx.currentVars = null
- val eval = bindReferences[Expression](
+ val resultVars = bindReferences[Expression](
resultExpressions,
groupingAttributes).map(_.genCode(ctx))
- consume(ctx, eval)
+ val evaluateNondeterministicResults = evaluateNondeterministicResults(resultVars)
+ s"""
+ |$evaluateNondeterministicAggResults
+ |${consume(ctx, resultVars)}
+ """.stripMargin
}
ctx.addNewFunction(funcName,
s"""|
I was thinking of why this following simple code snippet doesn't have the same issue: because it produces the expected and it seems because Oops, meanwhile we got the same answer. Thanks @rednaxelafx. |
|
Thanks, Kris, I'm just curious that the @rednaxelafx approach has no performance regression.. |
|
So, shall I adjust the fix as @rednaxelafx suggested and maybe run another benchmark? Any objections? |
|
@maropu : my proposed change won't introduce any performance regressions because what used to be both (1) correct and (2) fast will stay the same, no changes whatsoever; whereas what used to be incorrect will be fixed. |
|
Thanks for your comment @rednaxelafx , huge +1 on everything you just said.
@cloud-fan if it is not the only one, I think we have to fix the others too, but I don't think there are. |
|
@rednaxelafx I just worried about performance numbers other than TPCDS though, that's certainly true. Thanks, Kris. nit: btw, could we move |
|
Thank you all for the comments and suggestions. |
rednaxelafx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM, with a comment in the test case.
| * Returns source code to evaluate the variables for non-deterministic expressions, and clear the | ||
| * code of evaluated variables, to prevent them to be evaluated twice. | ||
| */ | ||
| protected def evaluateNondeterministicVariables( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick on naming: "variables" are never non-deterministic, only expressions can have the property of being deterministic or not. Two options:
- I'd prefer naming this utility function
evaluateNondeterministicResultsto emphasis this should (mostly) be used on the results of an output projection list. - But the existing utility function
evaluateRequiredVariablesuses the "variable" notion, so keeping consistency there is fine too.
I'm fine either way.
Also, historically Spark SQL's WSCG would use variable names like eval for the ExprCode type, e.g. evals: Seq[ExprCode]. Not sure why it started that way but you can see that naming pattern throughout the WSCG code base.
Again, your new utility function follows the same names used in evaluateRequiredVariables so that's fine. Local consistency is good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep the consistent naming, +1 for evaluateNondeterministicVariables .
| val baseTable = Seq((1), (1)).toDF("idx") | ||
|
|
||
| // BroadcastHashJoinExec with a HashAggregateExec child containing no aggregate expressions | ||
| val distinctWithId = baseTable.distinct().withColumn("id", monotonically_increasing_id()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how stable the results are going to be if you use monotonically_increasing_id here with an unspecified number of shuffle partitions. Since you're checking the exact value of the resulting id, if the number of shuffle partitions changes (let's say if someone decides to change the default shuffle partitions setting in all tests), this test can become fragile and fail unnecessarily.
It might be worth setting the shuffle partition to 1 explicitly inside this test case. Or go back to grouping by id instead of checking the exact value of id, or just assert the ids are equal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, how about wrapping with withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) for safeguard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Fixed both.
|
Test build #102262 has finished for PR 23731 at commit
|
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Outdated
Show resolved
Hide resolved
| val distinctWithId = baseTable.distinct().withColumn("id", monotonically_increasing_id()) | ||
| .join(baseTable, "idx") | ||
| assert(distinctWithId.queryExecution.executedPlan.collectFirst { | ||
| case BroadcastHashJoinExec(_, _, _, _, _, HashAggregateExec(_, _, Seq(), _, _, _, _), _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this?
assert(distinctWithId.queryExecution.executedPlan.collectFirst {
case j: BroadcastHashJoinExec if j.left.asInstanceOf[HashAggregateExec] => true
}.isDefined)
We need to strictly check agregate exprs? It seems baseTable.distinct() obviously has no aggregate expr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer avoiding isInstanceOf if possible, but changed it a bit.
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Outdated
Show resolved
Hide resolved
mgaido91
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the fix itself looks fine to me. Just some comments on the test, may you please also re-run the benchmark for the query having a considerable perf issue earlier i order to confirm now we have no regression? Thanks.
| } | ||
| } | ||
|
|
||
| test("SPARK-26572: fix aggregate codegen result evaluation") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a problem with whole stage codegen, waht about moving this test to WholeStageCodegenSuite? And adding an assert that whole stage codegen is actually used, ie. the HashAggregate is a child of WholeStageCodegenExec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with moving it to WholeStageCodegenSuite but the plan looks like:
*(3) Project [idx#4, id#6L]
+- *(3) BroadcastHashJoin [idx#4], [idx#9], Inner, BuildRight
:- *(3) HashAggregate(keys=[idx#4], functions=[], output=[idx#4, id#6L])
: +- Exchange hashpartitioning(idx#4, 1)
: +- *(1) HashAggregate(keys=[idx#4], functions=[], output=[idx#4])
: +- *(1) Project [value#1 AS idx#4]
: +- LocalTableScan [value#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *(2) Project [value#1 AS idx#9]
+- LocalTableScan [value#1]
so I guess you mean checking WholeStageCodegenExec has a ProjectExec child that has a BroadcastHashJoinExec child?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved and added WholeStageCodegenExec check.
|
Test build #102288 has finished for PR 23731 at commit
|
|
Hmm, the failing UT doesn't seem to be related to the changes in this PR. |
|
retest this please |
|
Test build #102292 has finished for PR 23731 at commit
|
@mgaido91, I checked that the PR now doesn't add pref regression. |
|
LGTM |
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
Outdated
Show resolved
Hide resolved
|
Looks good and a minor comment about variable naming. |
Change-Id: I1a2c52e7ba30a186517d91568093da813f201d1f
|
Test build #102342 has finished for PR 23731 at commit
|
This PR is a correctness fix in `HashAggregateExec` code generation. It forces evaluation of result expressions before calling `consume()` to avoid multiple executions. This PR fixes a use case where an aggregate is nested into a broadcast join and appears on the "stream" side. The issue is that Broadcast join generates it's own loop. And without forcing evaluation of `resultExpressions` of `HashAggregateExec` before the join's loop these expressions can be executed multiple times giving incorrect results. New UT was added. Closes #23731 from peter-toth/SPARK-26572. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 2228ee5) Signed-off-by: Wenchen Fan <[email protected]>
This PR is a correctness fix in `HashAggregateExec` code generation. It forces evaluation of result expressions before calling `consume()` to avoid multiple executions. This PR fixes a use case where an aggregate is nested into a broadcast join and appears on the "stream" side. The issue is that Broadcast join generates it's own loop. And without forcing evaluation of `resultExpressions` of `HashAggregateExec` before the join's loop these expressions can be executed multiple times giving incorrect results. New UT was added. Closes #23731 from peter-toth/SPARK-26572. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 2228ee5) Signed-off-by: Wenchen Fan <[email protected]>
|
thanks, merging to master/2.4/2.3! |
|
Thanks @cloud-fan @maropu @mgaido91 @rednaxelafx and @viirya for your review and help. |
## What changes were proposed in this pull request? This PR is a correctness fix in `HashAggregateExec` code generation. It forces evaluation of result expressions before calling `consume()` to avoid multiple executions. This PR fixes a use case where an aggregate is nested into a broadcast join and appears on the "stream" side. The issue is that Broadcast join generates it's own loop. And without forcing evaluation of `resultExpressions` of `HashAggregateExec` before the join's loop these expressions can be executed multiple times giving incorrect results. ## How was this patch tested? New UT was added. Closes apache#23731 from peter-toth/SPARK-26572. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
This PR is a correctness fix in `HashAggregateExec` code generation. It forces evaluation of result expressions before calling `consume()` to avoid multiple executions. This PR fixes a use case where an aggregate is nested into a broadcast join and appears on the "stream" side. The issue is that Broadcast join generates it's own loop. And without forcing evaluation of `resultExpressions` of `HashAggregateExec` before the join's loop these expressions can be executed multiple times giving incorrect results. New UT was added. Closes apache#23731 from peter-toth/SPARK-26572. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 2228ee5) Signed-off-by: Wenchen Fan <[email protected]>
This PR is a correctness fix in `HashAggregateExec` code generation. It forces evaluation of result expressions before calling `consume()` to avoid multiple executions. This PR fixes a use case where an aggregate is nested into a broadcast join and appears on the "stream" side. The issue is that Broadcast join generates it's own loop. And without forcing evaluation of `resultExpressions` of `HashAggregateExec` before the join's loop these expressions can be executed multiple times giving incorrect results. New UT was added. Closes apache#23731 from peter-toth/SPARK-26572. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 2228ee5) Signed-off-by: Wenchen Fan <[email protected]>
This PR is a correctness fix in `HashAggregateExec` code generation. It forces evaluation of result expressions before calling `consume()` to avoid multiple executions. This PR fixes a use case where an aggregate is nested into a broadcast join and appears on the "stream" side. The issue is that Broadcast join generates it's own loop. And without forcing evaluation of `resultExpressions` of `HashAggregateExec` before the join's loop these expressions can be executed multiple times giving incorrect results. New UT was added. Closes apache#23731 from peter-toth/SPARK-26572. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 2228ee5) Signed-off-by: Wenchen Fan <[email protected]>
This PR is a correctness fix in `HashAggregateExec` code generation. It forces evaluation of result expressions before calling `consume()` to avoid multiple executions. This PR fixes a use case where an aggregate is nested into a broadcast join and appears on the "stream" side. The issue is that Broadcast join generates it's own loop. And without forcing evaluation of `resultExpressions` of `HashAggregateExec` before the join's loop these expressions can be executed multiple times giving incorrect results. New UT was added. Closes apache#23731 from peter-toth/SPARK-26572. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 2228ee5) RB=1571578 G=superfriends-reviewers R=fli,yezhou,edlu,mshen A=yezhou
What changes were proposed in this pull request?
This PR is a correctness fix in
HashAggregateExeccode generation. It forces evaluation of result expressions before callingconsume()to avoid multiple executions.This PR fixes a use case where an aggregate is nested into a broadcast join and appears on the "stream" side. The issue is that Broadcast join generates it's own loop. And without forcing evaluation of
resultExpressionsofHashAggregateExecbefore the join's loop these expressions can be executed multiple times giving incorrect results.How was this patch tested?
New UT was added.