diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 87cafa58d5fa6..a43c1cc0177db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -209,7 +209,7 @@ class QueryExecution( executePhase(QueryPlanningTracker.PLANNING) { // Clone the logical plan here, in case the planner rules change the states of the logical // plan. - QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone()) + QueryExecution.createSparkPlan(planner, optimizedPlan.clone()) } } @@ -574,7 +574,6 @@ object QueryExecution { * Note that the returned physical plan still needs to be prepared for execution. */ def createSparkPlan( - sparkSession: SparkSession, planner: SparkPlanner, plan: LogicalPlan): SparkPlan = { // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, @@ -594,7 +593,7 @@ object QueryExecution { * [[SparkPlan]] for execution. */ def prepareExecutedPlan(spark: SparkSession, plan: LogicalPlan): SparkPlan = { - val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, plan.clone()) + val sparkPlan = createSparkPlan(spark.sessionState.planner, plan.clone()) prepareExecutedPlan(spark, sparkPlan) } @@ -603,11 +602,11 @@ object QueryExecution { * This method is only called by [[PlanAdaptiveDynamicPruningFilters]]. */ def prepareExecutedPlan( - session: SparkSession, plan: LogicalPlan, context: AdaptiveExecutionContext): SparkPlan = { - val sparkPlan = createSparkPlan(session, session.sessionState.planner, plan.clone()) - val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true) + val sparkPlan = createSparkPlan(context.session.sessionState.planner, plan.clone()) + val preparationRules = + preparations(context.session, Option(InsertAdaptiveSparkPlan(context)), true) prepareForExecution(preparationRules, sparkPlan.clone()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 73fc9b1fe4e2c..2855f902a8509 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -153,7 +153,7 @@ case class InsertAdaptiveSparkPlan( // Apply the same instance of this rule to sub-queries so that sub-queries all share the // same `stageCache` for Exchange reuse. this.applyInternal( - QueryExecution.createSparkPlan(adaptiveExecutionContext.session, + QueryExecution.createSparkPlan( adaptiveExecutionContext.session.sessionState.planner, plan.clone()), true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala index 77c180b18aee0..751cfe5b7bb6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala @@ -74,9 +74,7 @@ case class PlanAdaptiveDynamicPruningFilters( val aliases = indices.map(idx => Alias(buildKeys(idx), buildKeys(idx).toString)()) val aggregate = Aggregate(aliases, aliases, buildPlan) - val session = adaptivePlan.context.session - val sparkPlan = QueryExecution.prepareExecutedPlan( - session, aggregate, adaptivePlan.context) + val sparkPlan = QueryExecution.prepareExecutedPlan(aggregate, adaptivePlan.context) assert(sparkPlan.isInstanceOf[AdaptiveSparkPlanExec]) val newAdaptivePlan = sparkPlan.asInstanceOf[AdaptiveSparkPlanExec] val values = SubqueryExec(name, newAdaptivePlan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala index 5f5a9e188532e..059729d86bfaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala @@ -55,8 +55,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp plan.transformAllExpressionsWithPruning(_.containsPattern(DYNAMIC_PRUNING_SUBQUERY)) { case DynamicPruningSubquery( value, buildPlan, buildKeys, broadcastKeyIndices, onlyInBroadcast, exprId, _) => - val sparkPlan = QueryExecution.createSparkPlan( - sparkSession, sparkSession.sessionState.planner, buildPlan) + val sparkPlan = QueryExecution.createSparkPlan(sparkSession.sessionState.planner, buildPlan) // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is // the first to be applied (apart from `InsertAdaptiveSparkPlan`). val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&