Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val ADAPTIVE_EXECUTION_FORCE_APPLY = buildConf("spark.sql.adaptive.forceApply")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.adaptive.forceApply.enabled?
cc @gatorsmile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No... that sounds weird.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maryannxue . .enabled is a general guideline for boolean flag from @gatorsmile .

No... that sounds weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's usually xxx.featureName.enabled, but forceApply is a verb. For example, spark.sql.join.preferSortMergeJoin

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Got it. I misunderstood the rule at that part until now. My bad. Thank you, @cloud-fan and @maryannxue .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a policy that config name must be xxx.featureName.enabled? At least for internal configs we follow PR author's personal preference AFAIK.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should then stop renaming the configurations to add .enabled (e.g., #27346, #27210, #26694). reuse, ignore, and fail can be a verb either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile do we need to add ".enabled" post-fix to all boolean configs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR, it would be great if we have a documented policy for this, @gatorsmile and @cloud-fan .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the references, @HyukjinKwon .

.internal()
.doc("Adaptive query execution is skipped when the query does not have exchanges or " +
"subqueries. By setting this config to true, Spark will force apply adaptive query " +
"execution for all supported queries.")
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, shall we mention like By setting both spark.sql.adaptive.enabled and this config to true,?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By setting this config (together with spark.sql.adaptive.enabled) to true

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good.

.booleanConf
.createWithDefault(false)

val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables reducing " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,49 +40,60 @@ case class InsertAdaptiveSparkPlan(

private val conf = adaptiveExecutionContext.session.sessionState.conf

def containShuffle(plan: SparkPlan): Boolean = {
plan.find {
case _: Exchange => true
case s: SparkPlan => !s.requiredChildDistribution.forall(_ == UnspecifiedDistribution)
}.isDefined
}

def containSubQuery(plan: SparkPlan): Boolean = {
plan.find(_.expressions.exists(_.find {
case _: SubqueryExpression => true
case _ => false
}.isDefined)).isDefined
}

override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)

private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
case _ if !conf.adaptiveExecutionEnabled => plan
case _: ExecutedCommandExec => plan
case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan)
&& (isSubquery || containShuffle(plan) || containSubQuery(plan)) =>
try {
// Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. Fall
// back to non-adaptive mode if adaptive execution is supported in any of the sub-queries.
val subqueryMap = buildSubqueryMap(plan)
val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
val preprocessingRules = Seq(
planSubqueriesRule)
// Run pre-processing rules.
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
logDebug(s"Adaptive execution enabled for plan: $plan")
AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)
} catch {
case SubqueryAdaptiveNotSupportedException(subquery) =>
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for sub-query: $subquery.")
plan
}
case _ =>
if (conf.adaptiveExecutionEnabled) {
case _ if shouldApplyAQE(plan, isSubquery) =>
if (supportAdaptive(plan)) {
try {
// Plan sub-queries recursively and pass in the shared stage cache for exchange reuse.
// Fall back to non-AQE mode if AQE is not supported in any of the sub-queries.
val subqueryMap = buildSubqueryMap(plan)
val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
val preprocessingRules = Seq(
planSubqueriesRule)
// Run pre-processing rules.
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
logDebug(s"Adaptive execution enabled for plan: $plan")
AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)
} catch {
case SubqueryAdaptiveNotSupportedException(subquery) =>
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for sub-query: $subquery.")
plan
}
} else {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for query: $plan.")
plan
}
plan

case _ => plan
}

// AQE is only useful when the query has exchanges or sub-queries. This method returns true if
// one of the following conditions is satisfied:
// - The config ADAPTIVE_EXECUTION_FORCE_APPLY is true.
// - The input query is from a sub-query. When this happens, it means we've already decided to
// apply AQE for the main query and we must continue to do it.
// - The query contains exchanges.
// - The query may need to add exchanges. It's an overkill to run `EnsureRequirements` here, so
// we just check `SparkPlan.requiredChildDistribution` and see if it's possible that the
// the query needs to add exchanges later.
// - The query contains sub-query.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much clear than the original code.

private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
plan.find {
case _: Exchange => true
case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true
case p => p.expressions.exists(_.find {
case _: SubqueryExpression => true
case _ => false
}.isDefined)
}.isDefined
}
}

private def supportAdaptive(plan: SparkPlan): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,4 +780,13 @@ class AdaptiveQueryExecSuite
)
}
}

test("force apply AQE") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
val plan = sql("SELECT * FROM testData").queryExecution.executedPlan
assert(plan.isInstanceOf[AdaptiveSparkPlanExec])
}
}
}