diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a930f638ca66..0020faed79ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -677,6 +677,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN = + buildConf("spark.sql.adaptive.forceOptimizeSkewedJoin") + .doc("When true, force enable OptimizeSkewedJoin even if it introduces extra shuffle.") + .version("3.3.0") + .booleanConf + .createWithDefault(false) + val ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS = buildConf("spark.sql.adaptive.customCostEvaluatorClass") .doc("The custom cost evaluator class to be used for adaptive execution. If not being set," + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index bf810f3f6587..13c9528323ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -97,27 +97,36 @@ case class AdaptiveSparkPlanExec( AQEUtils.getRequiredDistribution(inputPlan) } + @transient private val costEvaluator = + conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match { + case Some(className) => CostEvaluator.instantiate(className, session.sparkContext.getConf) + case _ => SimpleCostEvaluator(conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)) + } + // A list of physical plan rules to be applied before creation of query stages. The physical // plan should reach a final status of query stages (i.e., no more addition or removal of // Exchange nodes) after running these rules. - @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( - RemoveRedundantProjects, + @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = { // For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for // the final plan, but we do need to respect the user-specified repartition. Here we ask // `EnsureRequirements` to not optimize out the user-specified repartition-by-col to work // around this case. - EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined), - RemoveRedundantSorts, - DisableUnnecessaryBucketedScan - ) ++ context.session.sessionState.queryStagePrepRules + val ensureRequirements = + EnsureRequirements(requiredDistribution.isDefined, requiredDistribution) + Seq( + RemoveRedundantProjects, + ensureRequirements, + RemoveRedundantSorts, + DisableUnnecessaryBucketedScan, + OptimizeSkewedJoin(ensureRequirements, costEvaluator) + ) ++ context.session.sessionState.queryStagePrepRules + } // A list of physical optimizer rules to be applied to a new stage before its execution. These // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( PlanAdaptiveDynamicPruningFilters(this), ReuseAdaptiveSubquery(context.subqueryCache), - // Skew join does not handle `AQEShuffleRead` so needs to be applied first. - OptimizeSkewedJoin, OptimizeSkewInRebalancePartitions, CoalesceShufflePartitions(context.session), // `OptimizeShuffleWithLocalRead` needs to make use of 'AQEShuffleReadExec.partitionSpecs' @@ -169,12 +178,6 @@ case class AdaptiveSparkPlanExec( optimized } - @transient private val costEvaluator = - conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match { - case Some(className) => CostEvaluator.instantiate(className, session.sparkContext.getConf) - case _ => SimpleCostEvaluator - } - @transient val initialPlan = context.session.withActive { applyPhysicalRules( inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations"))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 88abe68197be..2fe5b18a75ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -22,8 +22,9 @@ import scala.collection.mutable import org.apache.commons.io.FileUtils import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleOrigin} +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, EnsureRequirements} import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf @@ -48,9 +49,10 @@ import org.apache.spark.sql.internal.SQLConf * (L3, R3-1), (L3, R3-2), * (L4-1, R4-1), (L4-2, R4-1), (L4-1, R4-2), (L4-2, R4-2) */ -object OptimizeSkewedJoin extends AQEShuffleReadRule { - - override val supportedShuffleOrigins: Seq[ShuffleOrigin] = Seq(ENSURE_REQUIREMENTS) +case class OptimizeSkewedJoin( + ensureRequirements: EnsureRequirements, + costEvaluator: CostEvaluator) + extends Rule[SparkPlan] { /** * A partition is considered as a skewed partition if its size is larger than the median @@ -250,7 +252,17 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule { // SHJ // Shuffle // Shuffle - optimizeSkewJoin(plan) + val optimized = ensureRequirements.apply(optimizeSkewJoin(plan)) + val originCost = costEvaluator.evaluateCost(plan) + val optimizedCost = costEvaluator.evaluateCost(optimized) + // two cases we will pick new plan: + // 1. optimize the skew join without extra shuffle + // 2. optimize the skew join with extra shuffle but the costEvaluator think it's better + if (optimizedCost <= originCost) { + optimized + } else { + plan + } } else { plan } @@ -258,7 +270,8 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule { object ShuffleStage { def unapply(plan: SparkPlan): Option[ShuffleQueryStageExec] = plan match { - case s: ShuffleQueryStageExec if s.mapStats.isDefined && isSupported(s.shuffle) => + case s: ShuffleQueryStageExec if s.isMaterialized && s.mapStats.isDefined && + s.shuffle.shuffleOrigin == ENSURE_REQUIREMENTS => Some(s) case _ => None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/simpleCosting.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/simpleCosting.scala index 7f026835dcf4..864563be3855 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/simpleCosting.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/simpleCosting.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike +import org.apache.spark.sql.execution.joins.ShuffledJoin /** * A simple implementation of [[Cost]], which takes a number of [[Long]] as the cost value. @@ -35,15 +36,52 @@ case class SimpleCost(value: Long) extends Cost { } /** - * A simple implementation of [[CostEvaluator]], which counts the number of - * [[ShuffleExchangeLike]] nodes in the plan. + * A skew join aware implementation of [[Cost]], which consider shuffle number and skew join number. + * + * We always pick the cost which has more skew join even if it introduces one or more extra shuffle. + * Otherwise, if two costs have the same number of skew join or no skew join, we will pick the one + * with small number of shuffle. */ -object SimpleCostEvaluator extends CostEvaluator { +case class SkewJoinAwareCost( + numShuffles: Int, + numSkewJoins: Int) extends Cost { + override def compare(that: Cost): Int = that match { + case other: SkewJoinAwareCost => + // If more skew joins are optimized or less shuffle nodes, it means the cost is lower + if (numSkewJoins > other.numSkewJoins) { + -1 + } else if (numSkewJoins < other.numSkewJoins) { + 1 + } else if (numShuffles < other.numShuffles) { + -1 + } else if (numShuffles > other.numShuffles) { + 1 + } else { + 0 + } + + case _ => + throw QueryExecutionErrors.cannotCompareCostWithTargetCostError(that.toString) + } +} +/** + * A skew join aware implementation of [[CostEvaluator]], which counts the number of + * [[ShuffleExchangeLike]] nodes and skew join nodes in the plan. + */ +case class SimpleCostEvaluator(forceOptimizeSkewedJoin: Boolean) extends CostEvaluator { override def evaluateCost(plan: SparkPlan): Cost = { - val cost = plan.collect { + val numShuffles = plan.collect { case s: ShuffleExchangeLike => s }.size - SimpleCost(cost) + + if (forceOptimizeSkewedJoin) { + val numSkewJoins = plan.collect { + case j: ShuffledJoin if j.isSkewJoin => j + }.size + SkewJoinAwareCost(numShuffles, numSkewJoins) + } else { + SimpleCost(numShuffles) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 23716f1081d3..86b2344629d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -38,18 +38,23 @@ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoin * but can be false in AQE when AQE optimization may change the plan * output partitioning and need to retain the user-specified * repartition shuffles in the plan. + * @param requiredDistribution The root required distribution we should ensure. This value is used + * in AQE in case we change final stage output partitioning. */ -case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Rule[SparkPlan] { - - private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { - val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution - val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering - var children: Seq[SparkPlan] = operator.children - assert(requiredChildDistributions.length == children.length) - assert(requiredChildOrderings.length == children.length) +case class EnsureRequirements( + optimizeOutRepartition: Boolean = true, + requiredDistribution: Option[Distribution] = None) + extends Rule[SparkPlan] { + private def ensureDistributionAndOrdering( + originalChildren: Seq[SparkPlan], + requiredChildDistributions: Seq[Distribution], + requiredChildOrderings: Seq[Seq[SortOrder]], + shuffleOrigin: ShuffleOrigin): Seq[SparkPlan] = { + assert(requiredChildDistributions.length == originalChildren.length) + assert(requiredChildOrderings.length == originalChildren.length) // Ensure that the operator's children satisfy their output distribution requirements. - children = children.zip(requiredChildDistributions).map { + var newChildren = originalChildren.zip(requiredChildDistributions).map { case (child, distribution) if child.outputPartitioning.satisfies(distribution) => child case (child, BroadcastDistribution(mode)) => @@ -57,7 +62,7 @@ case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Ru case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(conf.numShufflePartitions) - ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) + ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child, shuffleOrigin) } // Get the indexes of children which have specified distribution requirements and need to have @@ -69,7 +74,7 @@ case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Ru }.map(_._2) val childrenNumPartitions = - childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet + childrenIndexes.map(newChildren(_).outputPartitioning.numPartitions).toSet if (childrenNumPartitions.size > 1) { // Get the number of partitions which is explicitly required by the distributions. @@ -78,7 +83,7 @@ case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Ru index => requiredChildDistributions(index).requiredNumPartitions }.toSet assert(numPartitionsSet.size <= 1, - s"$operator have incompatible requirements of the number of partitions for its children") + s"$requiredChildDistributions have incompatible requirements of the number of partitions") numPartitionsSet.headOption } @@ -87,7 +92,7 @@ case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Ru // 1. We should avoid shuffling these children. // 2. We should have a reasonable parallelism. val nonShuffleChildrenNumPartitions = - childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) + childrenIndexes.map(newChildren).filterNot(_.isInstanceOf[ShuffleExchangeExec]) .map(_.outputPartitioning.numPartitions) val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) { if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) { @@ -106,7 +111,7 @@ case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Ru val targetNumPartitions = requiredNumPartitions.getOrElse(expectedChildrenNumPartitions) - children = children.zip(requiredChildDistributions).zipWithIndex.map { + newChildren = newChildren.zip(requiredChildDistributions).zipWithIndex.map { case ((child, distribution), index) if childrenIndexes.contains(index) => if (child.outputPartitioning.numPartitions == targetNumPartitions) { child @@ -124,7 +129,7 @@ case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Ru } // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings: - children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) => + newChildren = newChildren.zip(requiredChildOrderings).map { case (child, requiredOrdering) => // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort. if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) { child @@ -133,7 +138,7 @@ case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Ru } } - operator.withNewChildren(children) + newChildren } private def reorder( @@ -254,25 +259,50 @@ case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Ru } } - def apply(plan: SparkPlan): SparkPlan = plan.transformUp { - case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, shuffleOrigin) - if optimizeOutRepartition && - (shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == REPARTITION_BY_NUM) => - def hasSemanticEqualPartitioning(partitioning: Partitioning): Boolean = { - partitioning match { - case lower: HashPartitioning if upper.semanticEquals(lower) => true - case lower: PartitioningCollection => - lower.partitionings.exists(hasSemanticEqualPartitioning) - case _ => false + def apply(plan: SparkPlan): SparkPlan = { + val newPlan = plan.transformUp { + case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, shuffleOrigin) + if optimizeOutRepartition && + (shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == REPARTITION_BY_NUM) => + def hasSemanticEqualPartitioning(partitioning: Partitioning): Boolean = { + partitioning match { + case lower: HashPartitioning if upper.semanticEquals(lower) => true + case lower: PartitioningCollection => + lower.partitionings.exists(hasSemanticEqualPartitioning) + case _ => false + } } - } - if (hasSemanticEqualPartitioning(child.outputPartitioning)) { - child + if (hasSemanticEqualPartitioning(child.outputPartitioning)) { + child + } else { + operator + } + + case operator: SparkPlan => + val reordered = reorderJoinPredicates(operator) + val newChildren = ensureDistributionAndOrdering( + reordered.children, + reordered.requiredChildDistribution, + reordered.requiredChildOrdering, + ENSURE_REQUIREMENTS) + reordered.withNewChildren(newChildren) + } + + if (requiredDistribution.isDefined) { + val shuffleOrigin = if (requiredDistribution.get.requiredNumPartitions.isDefined) { + REPARTITION_BY_NUM } else { - operator + REPARTITION_BY_COL } - - case operator: SparkPlan => - ensureDistributionAndOrdering(reorderJoinPredicates(operator)) + val finalPlan = ensureDistributionAndOrdering( + newPlan :: Nil, + requiredDistribution.get :: Nil, + Seq(Nil), + shuffleOrigin) + assert(finalPlan.size == 1) + finalPlan.head + } else { + newPlan + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 4471fda654c0..548ba8706084 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1908,6 +1908,74 @@ class AdaptiveQueryExecSuite } } + test("SPARK-33832: Support optimize skew join even if introduce extra shuffle") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", + SQLConf.SHUFFLE_PARTITIONS.key -> "10", + SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN.key -> "true") { + withTempView("skewData1", "skewData2") { + spark + .range(0, 1000, 1, 10) + .selectExpr("id % 3 as key1", "id as value1") + .createOrReplaceTempView("skewData1") + spark + .range(0, 1000, 1, 10) + .selectExpr("id % 1 as key2", "id as value2") + .createOrReplaceTempView("skewData2") + + // check if optimized skewed join does not satisfy the required distribution + Seq(true, false).foreach { hasRequiredDistribution => + Seq(true, false).foreach { hasPartitionNumber => + val repartition = if (hasRequiredDistribution) { + s"/*+ repartition(${ if (hasPartitionNumber) "10," else ""}key1) */" + } else { + "" + } + + // check required distribution and extra shuffle + val (_, adaptive1) = + runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + + s"JOIN skewData2 ON key1 = key2 GROUP BY key1") + val shuffles1 = collect(adaptive1) { + case s: ShuffleExchangeExec => s + } + assert(shuffles1.size == 3) + // shuffles1.head is the top-level shuffle under the Aggregate operator + assert(shuffles1.head.shuffleOrigin == ENSURE_REQUIREMENTS) + val smj1 = findTopLevelSortMergeJoin(adaptive1) + assert(smj1.size == 1 && smj1.head.isSkewJoin) + + // only check required distribution + val (_, adaptive2) = + runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + + s"JOIN skewData2 ON key1 = key2") + val shuffles2 = collect(adaptive2) { + case s: ShuffleExchangeExec => s + } + if (hasRequiredDistribution) { + assert(shuffles2.size == 3) + val finalShuffle = shuffles2.head + if (hasPartitionNumber) { + assert(finalShuffle.shuffleOrigin == REPARTITION_BY_NUM) + } else { + assert(finalShuffle.shuffleOrigin == REPARTITION_BY_COL) + } + } else { + assert(shuffles2.size == 2) + } + val smj2 = findTopLevelSortMergeJoin(adaptive2) + assert(smj2.size == 1 && smj2.head.isSkewJoin) + } + } + } + } + } + test("SPARK-35968: AQE coalescing should not produce too small partitions by default") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (_, adaptive) =