Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comments
  • Loading branch information
cloud-fan committed Feb 5, 2020
commit 215435d1f63d8cbec17374cd5a340915a0844f3c
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ object SQLConf {
val ADAPTIVE_EXECUTION_FORCE_APPLY = buildConf("spark.sql.adaptive.forceApply")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.adaptive.forceApply.enabled?
cc @gatorsmile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No... that sounds weird.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maryannxue . .enabled is a general guideline for boolean flag from @gatorsmile .

No... that sounds weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's usually xxx.featureName.enabled, but forceApply is a verb. For example, spark.sql.join.preferSortMergeJoin

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Got it. I misunderstood the rule at that part until now. My bad. Thank you, @cloud-fan and @maryannxue .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a policy that config name must be xxx.featureName.enabled? At least for internal configs we follow PR author's personal preference AFAIK.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should then stop renaming the configurations to add .enabled (e.g., #27346, #27210, #26694). reuse, ignore, and fail can be a verb either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile do we need to add ".enabled" post-fix to all boolean configs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR, it would be great if we have a documented policy for this, @gatorsmile and @cloud-fan .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the references, @HyukjinKwon .

.internal()
.doc("Adaptive query execution is skipped when the query does not have exchanges or " +
"subqueries. By setting this config to true, Spark will be forced to apply adaptive " +
"query execution even if the query doesn't have exchange/subquery.")
"subqueries. By setting this config to true, Spark will force apply adaptive query " +
"execution for all supported queries.")
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, shall we mention like By setting both spark.sql.adaptive.enabled and this config to true,?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By setting this config (together with spark.sql.adaptive.enabled) to true

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good.

.booleanConf
.createWithDefault(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,12 @@ case class InsertAdaptiveSparkPlan(

private val conf = adaptiveExecutionContext.session.sessionState.conf

private def mayContainExchange(plan: SparkPlan): Boolean = {
plan.find {
case _: Exchange => true
case s: SparkPlan => !s.requiredChildDistribution.forall(_ == UnspecifiedDistribution)
}.isDefined
}

private def containSubQuery(plan: SparkPlan): Boolean = {
plan.find(_.expressions.exists(_.find {
case _: SubqueryExpression => true
case _ => false
}.isDefined)).isDefined
}

// AQE is only useful when the query has exchanges or sub-queries. This method returns true if one
// of the following conditions is satisfied:
// - The config ADAPTIVE_EXECUTION_FORCE_APPLY is true.
// - The input query is from a sub-query. When this happens, it means we've already decided to
// apply AQE for the main query and we must continue to do it.
// - The query may contains exchange. The exchanges are not added yet at this point and we can
// only know if the query may contain exchange or not by checking
// `SparkPlan.requiredChildDistribution`.
// - The query contains sub-query.
private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery ||
mayContainExchange(plan) || containSubQuery(plan)
}

override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)

private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
case _ if !conf.adaptiveExecutionEnabled => plan
case _: ExecutedCommandExec => plan
case _ if conf.adaptiveExecutionEnabled && shouldApplyAQE(plan, isSubquery) =>
case _ if shouldApplyAQE(plan, isSubquery) =>
if (supportAdaptive(plan)) {
try {
// Plan sub-queries recursively and pass in the shared stage cache for exchange reuse.
Expand Down Expand Up @@ -100,6 +73,29 @@ case class InsertAdaptiveSparkPlan(
case _ => plan
}

// AQE is only useful when the query has exchanges or sub-queries. This method returns true if
// one of the following conditions is satisfied:
// - The config ADAPTIVE_EXECUTION_FORCE_APPLY is true.
// - The input query is from a sub-query. When this happens, it means we've already decided to
// apply AQE for the main query and we must continue to do it.
// - The query contains exchanges.
// - The query may need to add exchanges. It's an overkill to run `EnsureRequirements` here, so
// we just check `SparkPlan.requiredChildDistribution` and see if it's possible that the
// the query needs to add exchanges later.
// - The query contains sub-query.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much clear than the original code.

private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
plan.find {
case _: Exchange => true
case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true
case p => p.expressions.exists(_.find {
case _: SubqueryExpression => true
case _ => false
}.isDefined)
}.isDefined
}
}

private def supportAdaptive(plan: SparkPlan): Boolean = {
// TODO migrate dynamic-partition-pruning onto adaptive execution.
sanityCheck(plan) &&
Expand Down