-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-30719][SQL] do not log warning if AQE is intentionally skipped and add a config to force apply #27452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -358,6 +358,15 @@ object SQLConf { | |
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val ADAPTIVE_EXECUTION_FORCE_APPLY = buildConf("spark.sql.adaptive.forceApply") | ||
| .internal() | ||
| .doc("Adaptive query execution is skipped when the query does not have exchanges or " + | ||
| "sub-queries. By setting this config to true (together with " + | ||
| s"'${ADAPTIVE_EXECUTION_ENABLED.key}' enabled), Spark will force apply adaptive query " + | ||
| "execution for all supported queries.") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be clear, shall we mention like
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By setting this config (together with spark.sql.adaptive.enabled) to true
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That sounds good. |
||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED = | ||
| buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled") | ||
| .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables reducing " + | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,49 +40,60 @@ case class InsertAdaptiveSparkPlan( | |
|
|
||
| private val conf = adaptiveExecutionContext.session.sessionState.conf | ||
|
|
||
| def containShuffle(plan: SparkPlan): Boolean = { | ||
| plan.find { | ||
| case _: Exchange => true | ||
| case s: SparkPlan => !s.requiredChildDistribution.forall(_ == UnspecifiedDistribution) | ||
| }.isDefined | ||
| } | ||
|
|
||
| def containSubQuery(plan: SparkPlan): Boolean = { | ||
| plan.find(_.expressions.exists(_.find { | ||
| case _: SubqueryExpression => true | ||
| case _ => false | ||
| }.isDefined)).isDefined | ||
| } | ||
|
|
||
| override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false) | ||
|
|
||
| private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match { | ||
| case _ if !conf.adaptiveExecutionEnabled => plan | ||
| case _: ExecutedCommandExec => plan | ||
| case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan) | ||
| && (isSubquery || containShuffle(plan) || containSubQuery(plan)) => | ||
| try { | ||
| // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. Fall | ||
| // back to non-adaptive mode if adaptive execution is supported in any of the sub-queries. | ||
| val subqueryMap = buildSubqueryMap(plan) | ||
| val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap) | ||
| val preprocessingRules = Seq( | ||
| planSubqueriesRule) | ||
| // Run pre-processing rules. | ||
| val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules) | ||
| logDebug(s"Adaptive execution enabled for plan: $plan") | ||
| AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery) | ||
| } catch { | ||
| case SubqueryAdaptiveNotSupportedException(subquery) => | ||
| logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + | ||
| s"but is not supported for sub-query: $subquery.") | ||
| plan | ||
| } | ||
| case _ => | ||
| if (conf.adaptiveExecutionEnabled) { | ||
| case _ if shouldApplyAQE(plan, isSubquery) => | ||
| if (supportAdaptive(plan)) { | ||
| try { | ||
| // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. | ||
| // Fall back to non-AQE mode if AQE is not supported in any of the sub-queries. | ||
| val subqueryMap = buildSubqueryMap(plan) | ||
| val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap) | ||
| val preprocessingRules = Seq( | ||
| planSubqueriesRule) | ||
| // Run pre-processing rules. | ||
| val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules) | ||
| logDebug(s"Adaptive execution enabled for plan: $plan") | ||
| AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery) | ||
| } catch { | ||
| case SubqueryAdaptiveNotSupportedException(subquery) => | ||
| logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + | ||
| s"but is not supported for sub-query: $subquery.") | ||
| plan | ||
| } | ||
| } else { | ||
| logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + | ||
| s"but is not supported for query: $plan.") | ||
| plan | ||
| } | ||
| plan | ||
|
|
||
| case _ => plan | ||
| } | ||
|
|
||
| // AQE is only useful when the query has exchanges or sub-queries. This method returns true if | ||
| // one of the following conditions is satisfied: | ||
| // - The config ADAPTIVE_EXECUTION_FORCE_APPLY is true. | ||
| // - The input query is from a sub-query. When this happens, it means we've already decided to | ||
| // apply AQE for the main query and we must continue to do it. | ||
| // - The query contains exchanges. | ||
| // - The query may need to add exchanges. It's an overkill to run `EnsureRequirements` here, so | ||
| // we just check `SparkPlan.requiredChildDistribution` and see if it's possible that the | ||
| // the query needs to add exchanges later. | ||
| // - The query contains sub-query. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is much clear than the original code. |
||
| private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = { | ||
| conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || { | ||
| plan.find { | ||
| case _: Exchange => true | ||
| case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true | ||
| case p => p.expressions.exists(_.find { | ||
| case _: SubqueryExpression => true | ||
| case _ => false | ||
| }.isDefined) | ||
| }.isDefined | ||
| } | ||
| } | ||
|
|
||
| private def supportAdaptive(plan: SparkPlan): Boolean = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sql.adaptive.forceApply.enabled?cc @gatorsmile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No... that sounds weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maryannxue .
.enabledis a general guideline for boolean flag from @gatorsmile .There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's usually
xxx.featureName.enabled, butforceApplyis a verb. For example,spark.sql.join.preferSortMergeJoinThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. Got it. I misunderstood the rule at that part until now. My bad. Thank you, @cloud-fan and @maryannxue .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have a policy that config name must be
xxx.featureName.enabled? At least for internal configs we follow PR author's personal preference AFAIK.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should then stop renaming the configurations to add
.enabled(e.g., #27346, #27210, #26694).reuse,ignore, andfailcan be a verb either.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile do we need to add ".enabled" post-fix to all boolean configs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this PR, it would be great if we have a documented policy for this, @gatorsmile and @cloud-fan .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the references, @HyukjinKwon .