Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
address comment
  • Loading branch information
ulysses-you committed Jun 7, 2023
commit ee7a476c1ff48a54259662313fc4ed4f0f763060
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.adaptive

import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningSubquery, ListQuery, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -134,10 +135,13 @@ case class InsertAdaptiveSparkPlan(
return subqueryMap.toMap
}
plan.foreach(_.expressions.filter(_.containsPattern(PLAN_EXPRESSION)).foreach(_.foreach {
case subquery: SubqueryExpression if !subqueryMap.contains(subquery.exprId.id) =>
val executedPlan = compileSubquery(subquery.plan)
verifyAdaptivePlan(executedPlan, subquery.plan)
subqueryMap.put(subquery.exprId.id, executedPlan)
case e @ (_: expressions.ScalarSubquery | _: ListQuery | _: DynamicPruningSubquery) =>
val subquery = e.asInstanceOf[SubqueryExpression]
if (!subqueryMap.contains(subquery.exprId.id)) {
val executedPlan = compileSubquery(subquery.plan)
verifyAdaptivePlan(executedPlan, subquery.plan)
subqueryMap.put(subquery.exprId.id, executedPlan)
}
case _ =>
}))

Expand Down