-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-30719][SQL] do not log warning if AQE is intentionally skipped and add a config to force apply #27452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -358,6 +358,14 @@ object SQLConf { | |
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val ADAPTIVE_EXECUTION_FORCE_APPLY = buildConf("spark.sql.adaptive.forceApply") | ||
| .internal() | ||
| .doc("Adaptive query execution is skipped when the query does not have exchanges or " + | ||
| "subqueries. By setting this config to true, Spark will be forced to apply adaptive " + | ||
|
||
| "query execution even if the query doesn't have exchange/subquery.") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED = | ||
| buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled") | ||
| .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables reducing " + | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,49 +40,64 @@ case class InsertAdaptiveSparkPlan( | |
|
|
||
| private val conf = adaptiveExecutionContext.session.sessionState.conf | ||
|
|
||
| def containShuffle(plan: SparkPlan): Boolean = { | ||
| private def mayContainExchange(plan: SparkPlan): Boolean = { | ||
| plan.find { | ||
| case _: Exchange => true | ||
| case s: SparkPlan => !s.requiredChildDistribution.forall(_ == UnspecifiedDistribution) | ||
| }.isDefined | ||
| } | ||
|
|
||
| def containSubQuery(plan: SparkPlan): Boolean = { | ||
| private def containSubQuery(plan: SparkPlan): Boolean = { | ||
| plan.find(_.expressions.exists(_.find { | ||
| case _: SubqueryExpression => true | ||
| case _ => false | ||
| }.isDefined)).isDefined | ||
| } | ||
|
|
||
| // AQE is only useful when the query has exchanges or sub-queries. This method returns true if one | ||
| // of the following conditions is satisfied: | ||
| // - The config ADAPTIVE_EXECUTION_FORCE_APPLY is true. | ||
| // - The input query is from a sub-query. When this happens, it means we've already decided to | ||
| // apply AQE for the main query and we must continue to do it. | ||
| // - The query may contains exchange. The exchanges are not added yet at this point and we can | ||
| // only know if the query may contain exchange or not by checking | ||
| // `SparkPlan.requiredChildDistribution`. | ||
| // - The query contains sub-query. | ||
| private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = { | ||
|
||
| conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || | ||
|
||
| mayContainExchange(plan) || containSubQuery(plan) | ||
| } | ||
|
|
||
| override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false) | ||
|
|
||
| private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match { | ||
| case _: ExecutedCommandExec => plan | ||
| case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan) | ||
| && (isSubquery || containShuffle(plan) || containSubQuery(plan)) => | ||
| try { | ||
| // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. Fall | ||
| // back to non-adaptive mode if adaptive execution is supported in any of the sub-queries. | ||
| val subqueryMap = buildSubqueryMap(plan) | ||
| val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap) | ||
| val preprocessingRules = Seq( | ||
| planSubqueriesRule) | ||
| // Run pre-processing rules. | ||
| val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules) | ||
| logDebug(s"Adaptive execution enabled for plan: $plan") | ||
| AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery) | ||
| } catch { | ||
| case SubqueryAdaptiveNotSupportedException(subquery) => | ||
| logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + | ||
| s"but is not supported for sub-query: $subquery.") | ||
| plan | ||
| } | ||
| case _ => | ||
| if (conf.adaptiveExecutionEnabled) { | ||
| case _ if conf.adaptiveExecutionEnabled && shouldApplyAQE(plan, isSubquery) => | ||
| if (supportAdaptive(plan)) { | ||
| try { | ||
| // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. | ||
| // Fall back to non-AQE mode if AQE is not supported in any of the sub-queries. | ||
| val subqueryMap = buildSubqueryMap(plan) | ||
| val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap) | ||
| val preprocessingRules = Seq( | ||
| planSubqueriesRule) | ||
| // Run pre-processing rules. | ||
| val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules) | ||
| logDebug(s"Adaptive execution enabled for plan: $plan") | ||
| AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery) | ||
| } catch { | ||
| case SubqueryAdaptiveNotSupportedException(subquery) => | ||
| logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + | ||
| s"but is not supported for sub-query: $subquery.") | ||
| plan | ||
| } | ||
| } else { | ||
| logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + | ||
| s"but is not supported for query: $plan.") | ||
| plan | ||
| } | ||
| plan | ||
|
|
||
| case _ => plan | ||
| } | ||
|
|
||
| private def supportAdaptive(plan: SparkPlan): Boolean = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sql.adaptive.forceApply.enabled?cc @gatorsmile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No... that sounds weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maryannxue .
.enabledis a general guideline for boolean flag from @gatorsmile .There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's usually
xxx.featureName.enabled, butforceApplyis a verb. For example,spark.sql.join.preferSortMergeJoinThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. Got it. I misunderstood the rule at that part until now. My bad. Thank you, @cloud-fan and @maryannxue .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have a policy that config name must be
xxx.featureName.enabled? At least for internal configs we follow PR author's personal preference AFAIK.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should then stop renaming the configurations to add
.enabled(e.g., #27346, #27210, #26694).reuse,ignore, andfailcan be a verb either.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile do we need to add ".enabled" post-fix to all boolean configs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this PR, it would be great if we have a documented policy for this, @gatorsmile and @cloud-fan .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the references, @HyukjinKwon .