diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index 308e65e793d8..89e2813695a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -61,7 +61,7 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { val numExchangeAfter = numExchanges(EnsureRequirements(conf).apply(optimizedPlan)) if (numExchangeAfter > numExchangeBefore) { - logWarning("OptimizeLocalShuffleReader rule is not applied due" + + logDebug("OptimizeLocalShuffleReader rule is not applied due" + " to additional shuffles will be introduced.") plan } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index cd0bf726da9a..43802968c469 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.QueryTest import org.apache.spark.sql.execution.{ReusedSubqueryExec, SparkPlan} -import org.apache.spark.sql.execution.adaptive.rule.CoalescedShuffleReaderExec import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildRight, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf @@ -78,7 +77,7 @@ class AdaptiveQueryExecSuite } private def checkNumLocalShuffleReaders(plan: SparkPlan, expected: Int): Unit = { - val localReaders = plan.collect { + val localReaders = collect(plan) { case reader: LocalShuffleReaderExec => reader } assert(localReaders.length === expected) @@ -164,7 +163,7 @@ class AdaptiveQueryExecSuite assert(smj.size == 3) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 3) - + // additional shuffle exchange introduced, only one shuffle reader to local shuffle reader. checkNumLocalShuffleReaders(adaptivePlan, 1) } } @@ -189,8 +188,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 3) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 3) - - checkNumLocalShuffleReaders(adaptivePlan, 0) + // additional shuffle exchange introduced, only one shuffle reader to local shuffle reader. + checkNumLocalShuffleReaders(adaptivePlan, 1) } } @@ -214,7 +213,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 3) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 3) - checkNumLocalShuffleReaders(adaptivePlan, 0) + // additional shuffle exchange introduced, only one shuffle reader to local shuffle reader. + checkNumLocalShuffleReaders(adaptivePlan, 1) } } @@ -229,6 +229,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 3) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 2) + checkNumLocalShuffleReaders(adaptivePlan, 2) + // Even with local shuffle reader, the query statge reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.size == 1) } @@ -245,6 +247,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) + checkNumLocalShuffleReaders(adaptivePlan, 1) + // Even with local shuffle reader, the query statge reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.size == 1) } @@ -263,6 +267,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) + checkNumLocalShuffleReaders(adaptivePlan, 1) + // Even with local shuffle reader, the query statge reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.nonEmpty) val sub = findReusedSubquery(adaptivePlan) @@ -282,6 +288,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) + checkNumLocalShuffleReaders(adaptivePlan, 1) + // Even with local shuffle reader, the query statge reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.isEmpty) val sub = findReusedSubquery(adaptivePlan) @@ -304,6 +312,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) + checkNumLocalShuffleReaders(adaptivePlan, 1) + // Even with local shuffle reader, the query statge reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.nonEmpty) assert(ex.head.plan.isInstanceOf[BroadcastQueryStageExec])