-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27393][SQL] Show ReusedSubquery in the plan when the subquery is reused #24258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -660,21 +660,28 @@ object CoalesceExec { | |
| } | ||
|
|
||
| /** | ||
| * Physical plan for a subquery. | ||
| * Parent class for different types of subquery plans | ||
| */ | ||
| case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { | ||
|
|
||
| override lazy val metrics = Map( | ||
| "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), | ||
| "collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect")) | ||
| abstract class BaseSubqueryExec extends SparkPlan { | ||
| def name: String | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. both ReusedSubqueryExec and SubqueryExec have the name |
||
| def child: SparkPlan | ||
|
|
||
| override def output: Seq[Attribute] = child.output | ||
|
|
||
| override def outputPartitioning: Partitioning = child.outputPartitioning | ||
|
|
||
| override def outputOrdering: Seq[SortOrder] = child.outputOrdering | ||
| } | ||
|
|
||
| override def doCanonicalize(): SparkPlan = child.canonicalized | ||
| /** | ||
| * Physical plan for a subquery. | ||
| */ | ||
| case class SubqueryExec(name: String, child: SparkPlan) | ||
| extends BaseSubqueryExec with UnaryExecNode { | ||
|
|
||
| override lazy val metrics = Map( | ||
| "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), | ||
| "collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect")) | ||
|
|
||
| @transient | ||
| private lazy val relationFuture: Future[Array[InternalRow]] = { | ||
|
|
@@ -698,6 +705,10 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { | |
| }(SubqueryExec.executionContext) | ||
| } | ||
|
|
||
| protected override def doCanonicalize(): SparkPlan = { | ||
| SubqueryExec("Subquery", child.canonicalized) | ||
| } | ||
|
|
||
| protected override def doPrepare(): Unit = { | ||
| relationFuture | ||
| } | ||
|
|
@@ -715,3 +726,29 @@ object SubqueryExec { | |
| private[execution] val executionContext = ExecutionContext.fromExecutorService( | ||
| ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) | ||
| } | ||
|
|
||
| /** | ||
| * A wrapper for reused [[BaseSubqueryExec]]. | ||
| */ | ||
| case class ReusedSubqueryExec(child: BaseSubqueryExec) | ||
| extends BaseSubqueryExec with LeafExecNode { | ||
|
|
||
| override def name: String = child.name | ||
|
|
||
| override def output: Seq[Attribute] = child.output | ||
| override def doCanonicalize(): SparkPlan = child.canonicalized | ||
| override def outputOrdering: Seq[SortOrder] = child.outputOrdering | ||
| override def outputPartitioning: Partitioning = child.outputPartitioning | ||
|
|
||
| protected override def doPrepare(): Unit = { | ||
| child.prepare() | ||
| } | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = { | ||
| child.execute() | ||
| } | ||
|
|
||
| override def executeCollect(): Array[InternalRow] = { | ||
| child.executeCollect() | ||
| } | ||
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer | |
|
|
||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.{expressions, InternalRow} | ||
| import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, InSet, Literal, PlanExpression} | ||
| import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, Expression, ExprId, InSet, Literal, PlanExpression} | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.internal.SQLConf | ||
|
|
@@ -31,11 +31,16 @@ import org.apache.spark.sql.types.{BooleanType, DataType, StructType} | |
| /** | ||
| * The base class for subquery that is used in SparkPlan. | ||
| */ | ||
| abstract class ExecSubqueryExpression extends PlanExpression[SubqueryExec] { | ||
| abstract class ExecSubqueryExpression extends PlanExpression[BaseSubqueryExec] { | ||
| /** | ||
| * Fill the expression with collected result from executed plan. | ||
| */ | ||
| def updateResult(): Unit | ||
|
|
||
| override def canonicalize(attrs: AttributeSeq): ExecSubqueryExpression = { | ||
| withNewPlan(plan.canonicalized.asInstanceOf[BaseSubqueryExec]) | ||
| .asInstanceOf[ExecSubqueryExpression] | ||
| } | ||
| } | ||
|
|
||
| object ExecSubqueryExpression { | ||
|
|
@@ -56,15 +61,15 @@ object ExecSubqueryExpression { | |
| * This is the physical copy of ScalarSubquery to be used inside SparkPlan. | ||
| */ | ||
| case class ScalarSubquery( | ||
| plan: SubqueryExec, | ||
| plan: BaseSubqueryExec, | ||
| exprId: ExprId) | ||
| extends ExecSubqueryExpression { | ||
|
|
||
| override def dataType: DataType = plan.schema.fields.head.dataType | ||
| override def children: Seq[Expression] = Nil | ||
| override def nullable: Boolean = true | ||
| override def toString: String = plan.simpleString(SQLConf.get.maxToStringFields) | ||
| override def withNewPlan(query: SubqueryExec): ScalarSubquery = copy(plan = query) | ||
| override def withNewPlan(query: BaseSubqueryExec): ScalarSubquery = copy(plan = query) | ||
|
|
||
| override def semanticEquals(other: Expression): Boolean = other match { | ||
| case s: ScalarSubquery => plan.sameResult(s.plan) | ||
|
|
@@ -129,13 +134,14 @@ case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] { | |
| return plan | ||
| } | ||
| // Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls. | ||
| val subqueries = mutable.HashMap[StructType, ArrayBuffer[SubqueryExec]]() | ||
| val subqueries = mutable.HashMap[StructType, ArrayBuffer[BaseSubqueryExec]]() | ||
| plan transformAllExpressions { | ||
| case sub: ExecSubqueryExpression => | ||
| val sameSchema = subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[SubqueryExec]()) | ||
| val sameSchema = | ||
| subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[BaseSubqueryExec]()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unnecessary change?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change it to BaseSubqueryExec |
||
| val sameResult = sameSchema.find(_.sameResult(sub.plan)) | ||
| if (sameResult.isDefined) { | ||
| sub.withNewPlan(sameResult.get) | ||
| sub.withNewPlan(ReusedSubqueryExec(sameResult.get)) | ||
| } else { | ||
| sameSchema += sub.plan | ||
| sub | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean | |
| import org.apache.spark.{AccumulatorSuite, SparkException} | ||
| import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} | ||
| import org.apache.spark.sql.catalyst.util.StringUtils | ||
| import org.apache.spark.sql.execution.{aggregate, ScalarSubquery, SubqueryExec} | ||
| import org.apache.spark.sql.execution.aggregate | ||
|
||
| import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} | ||
| import org.apache.spark.sql.execution.datasources.FilePartition | ||
| import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} | ||
|
|
@@ -113,33 +113,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |
| } | ||
| } | ||
|
|
||
| test("Reuse Subquery") { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved to SubquerySuite.scala |
||
| Seq(true, false).foreach { reuse => | ||
| withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) { | ||
| val df = sql( | ||
| """ | ||
| |SELECT (SELECT avg(key) FROM testData) + (SELECT avg(key) FROM testData) | ||
| |FROM testData | ||
| |LIMIT 1 | ||
| """.stripMargin) | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
| val subqueries = ArrayBuffer[SubqueryExec]() | ||
| df.queryExecution.executedPlan.transformAllExpressions { | ||
| case s @ ScalarSubquery(plan: SubqueryExec, _) => | ||
| subqueries += plan | ||
| s | ||
| } | ||
|
|
||
| if (reuse) { | ||
| assert(subqueries.distinct.size == 1, "Subquery reusing not working correctly") | ||
| } else { | ||
| assert(subqueries.distinct.size == 2, "There should be 2 subqueries when not reusing") | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-6743: no columns from cache") { | ||
| Seq( | ||
| (83, 0, 38), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to the canonicalized plan here? It seems all the canonicalization should happen in line 69?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1