Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS =
buildConf("spark.sql.adaptive.customCostEvaluatorClass")
.doc("The custom cost evaluator class to be used for adaptive execution. If not being set," +
" Spark will use its own SimpleCostEvaluator by default.")
.version("3.2.0")
Copy link
Member

@HyukjinKwon HyukjinKwon Jul 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only think is that the version has to be 3.3.0 since we cut the branch now. Since this PR won't likely affect anything in the main code, I am okay with merging to 3.2.0 either tho. I will leave it to @cloud-fan and you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.2 is the first version that enables AQE by default, and this seems to be a useful extension. Let's include it in 3.2.

.stringConf
.createOptional

val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ case class AdaptiveSparkPlanExec(
}
}

@transient private val costEvaluator = SimpleCostEvaluator
@transient private val costEvaluator =
conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match {
case Some(className) => CostEvaluator.instantiate(className, session.sparkContext.getConf)
case _ => SimpleCostEvaluator
}

@transient val initialPlan = context.session.withActive {
applyPhysicalRules(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,42 @@

package org.apache.spark.sql.execution.adaptive

import org.apache.spark.SparkConf
import org.apache.spark.annotation.Unstable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils

/**
* Represents the cost of a plan.
* An interface to represent the cost of a plan.
*
* @note This class is subject to be changed and/or moved in the near future.
*/
@Unstable
trait Cost extends Ordered[Cost]

/**
* Evaluates the cost of a physical plan.
* An interface to evaluate the cost of a physical plan.
*
* @note This class is subject to be changed and/or moved in the near future.
*/
@Unstable
trait CostEvaluator {
def evaluateCost(plan: SparkPlan): Cost
}

object CostEvaluator extends Logging {

/**
* Instantiates a [[CostEvaluator]] using the given className.
*/
def instantiate(className: String, conf: SparkConf): CostEvaluator = {
logDebug(s"Creating CostEvaluator $className")
val evaluators = Utils.loadExtensions(classOf[CostEvaluator], Seq(className), conf)
require(evaluators.nonEmpty, "A valid AQE cost evaluator must be specified by config " +
s"${SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key}, but $className resulted in zero " +
"valid evaluator.")
evaluators.head
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1898,4 +1898,54 @@ class AdaptiveQueryExecSuite
assert(coalesceReader.head.partitionSpecs.length == 1)
}
}

test("SPARK-35794: Allow custom plugin for cost evaluator") {
CostEvaluator.instantiate(
classOf[SimpleShuffleSortCostEvaluator].getCanonicalName, spark.sparkContext.getConf)
intercept[IllegalArgumentException] {
CostEvaluator.instantiate(
classOf[InvalidCostEvaluator].getCanonicalName, spark.sparkContext.getConf)
}

withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val query = "SELECT * FROM testData join testData2 ON key = a where value = '1'"

withSQLConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key ->
"org.apache.spark.sql.execution.adaptive.SimpleShuffleSortCostEvaluator") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this custom cost evaluator change the query plan? It seems to be the same with the builtin cost evaluator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - this evaluator does not change plan, and to be the same with the builtin evaluator for this query. Do we want to come up a different one here? I think this just validates the custom evaluator works.

Copy link
Contributor

@cloud-fan cloud-fan Jul 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, let's leave it then

val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query)
val smj = findTopLevelSortMergeJoin(plan)
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
checkNumLocalShuffleReaders(adaptivePlan)
}

withSQLConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key ->
"org.apache.spark.sql.execution.adaptive.InvalidCostEvaluator") {
intercept[IllegalArgumentException] {
runAdaptiveAndVerifyResult(query)
}
}
}
}
}

/**
* Invalid implementation class for [[CostEvaluator]].
*/
private class InvalidCostEvaluator() {}

/**
* A simple [[CostEvaluator]] to count number of [[ShuffleExchangeLike]] and [[SortExec]].
*/
private case class SimpleShuffleSortCostEvaluator() extends CostEvaluator {
override def evaluateCost(plan: SparkPlan): Cost = {
val cost = plan.collect {
case s: ShuffleExchangeLike => s
case s: SortExec => s
}.size
SimpleCost(cost)
}
}