-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13523] [SQL] Reuse exchanges in a query #11403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.expressions._ | |
| import org.apache.spark.sql.catalyst.trees.TreeNode | ||
| import org.apache.spark.sql.types.{DataType, StructType} | ||
|
|
||
| abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] { | ||
| abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] { | ||
| self: PlanType => | ||
|
|
||
| def output: Seq[Attribute] | ||
|
|
@@ -237,4 +237,65 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy | |
| } | ||
|
|
||
| override def innerChildren: Seq[PlanType] = subqueries | ||
|
|
||
| /** | ||
| * Canonicalized copy of this query plan. | ||
| */ | ||
| protected lazy val canonicalized: PlanType = this | ||
|
|
||
| /** | ||
| * Returns true when the given query plan will return the same results as this query plan. | ||
| * | ||
| * Since its likely undecidable to generally determine if two given plans will produce the same | ||
| * results, it is okay for this function to return false, even if the results are actually | ||
| * the same. Such behavior will not affect correctness, only the application of performance | ||
| * enhancements like caching. However, it is not acceptable to return true if the results could | ||
| * possibly be different. | ||
| * | ||
| * By default this function performs a modified version of equality that is tolerant of cosmetic | ||
| * differences like attribute naming and or expression id differences. Operators that | ||
| * can do better should override this function. | ||
| */ | ||
| def sameResult(plan: PlanType): Boolean = { | ||
| val canonicalizedLeft = this.canonicalized | ||
| val canonicalizedRight = plan.canonicalized | ||
| canonicalizedLeft.getClass == canonicalizedRight.getClass && | ||
| canonicalizedLeft.children.size == canonicalizedRight.children.size && | ||
| canonicalizedLeft.cleanArgs == canonicalizedRight.cleanArgs && | ||
| (canonicalizedLeft.children, canonicalizedRight.children).zipped.forall(_ sameResult _) | ||
| } | ||
|
|
||
| /** | ||
| * All the attributes that are used for this plan. | ||
| */ | ||
| lazy val allAttributes: Seq[Attribute] = children.flatMap(_.output) | ||
|
|
||
| private def cleanExpression(e: Expression): Expression = e match { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this related to semantic equals? I think it makes more sense to move this to Expression than here
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, you just copied this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. semantic equals can not work with Attributes with different ExprId, this function will.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be named to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to keep it as |
||
| case a: Alias => | ||
| // As the root of the expression, Alias will always take an arbitrary exprId, we need | ||
| // to erase that for equality testing. | ||
| val cleanedExprId = | ||
| Alias(a.child, a.name)(ExprId(-1), a.qualifiers, isGenerated = a.isGenerated) | ||
| BindReferences.bindReference(cleanedExprId, allAttributes, allowFailures = true) | ||
| case other => | ||
| BindReferences.bindReference(other, allAttributes, allowFailures = true) | ||
| } | ||
|
|
||
| /** Args that have cleaned such that differences in expression id should not affect equality */ | ||
| protected lazy val cleanArgs: Seq[Any] = { | ||
| def cleanArg(arg: Any): Any = arg match { | ||
| case e: Expression => cleanExpression(e).canonicalized | ||
| case other => other | ||
| } | ||
|
|
||
| productIterator.map { | ||
| // Children are checked using sameResult above. | ||
| case tn: TreeNode[_] if containsChild(tn) => null | ||
| case e: Expression => cleanArg(e) | ||
| case s: Option[_] => s.map(cleanArg) | ||
| case s: Seq[_] => s.map(cleanArg) | ||
| case m: Map[_, _] => m.mapValues(cleanArg) | ||
| case other => other | ||
| }.toSeq | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,11 @@ import org.apache.spark.sql.catalyst.InternalRow | |
| */ | ||
| trait BroadcastMode { | ||
| def transform(rows: Array[InternalRow]): Any | ||
|
|
||
| /** | ||
| * Returns true iff this [[BroadcastMode]] generates the same result as `other`. | ||
| */ | ||
| def compatibleWith(other: BroadcastMode): Boolean | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we consistently use the same terms. sameResult() vs compatibleWith(). Compatible with also means something a bit different to me than equals
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. compatibleWith is not iff and not valid here. partitioning is different since it doesn't have to be both ways.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the confusion here related to what it means for broadcast modes to generate the same result, i.e. whether "same" incorporates shape vs. logical contents?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After offline discussion with @nongli , we realize that the current one-way is actually better than two way
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't use sameResult here, because broadcast mode does not have the child plan, sameResult require to compare them recursively. |
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -33,4 +38,8 @@ trait BroadcastMode { | |
| case object IdentityBroadcastMode extends BroadcastMode { | ||
| // TODO: pack the UnsafeRows into single bytes array. | ||
| override def transform(rows: Array[InternalRow]): Array[InternalRow] = rows | ||
|
|
||
| override def compatibleWith(other: BroadcastMode): Boolean = { | ||
| this eq other | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| package org.apache.spark.sql.execution | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
| import org.apache.spark.sql.execution.exchange.ReusedExchange | ||
| import org.apache.spark.sql.execution.metric.SQLMetricInfo | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -31,13 +32,28 @@ class SparkPlanInfo( | |
| val simpleString: String, | ||
| val children: Seq[SparkPlanInfo], | ||
| val metadata: Map[String, String], | ||
| val metrics: Seq[SQLMetricInfo]) | ||
| val metrics: Seq[SQLMetricInfo]) { | ||
|
|
||
| override def hashCode(): Int = { | ||
| // hashCode of simpleString should be good enough to distinguish the plans from each other | ||
| // within a plan | ||
| simpleString.hashCode | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why doesn't this use the same fields as equals? comment
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was the intent here to avoid an expensive-to-compute recursive hashcode over children? If so, would memoization help instead?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only need to make sure that those object equal with each other should have the same hashCode, but the hashCode does not need to considering all the members. Using simpleString here should be enough to have good hashCode (be good enough to distinguish each other within a plan) |
||
| } | ||
|
|
||
| override def equals(other: Any): Boolean = other match { | ||
| case o: SparkPlanInfo => | ||
| nodeName == o.nodeName && simpleString == o.simpleString && children == o.children | ||
| case _ => false | ||
| } | ||
| } | ||
|
|
||
| private[sql] object SparkPlanInfo { | ||
|
|
||
| def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { | ||
|
|
||
| val children = plan.children ++ plan.subqueries | ||
| val children = plan match { | ||
| case ReusedExchange(_, child) => child :: Nil | ||
| case _ => plan.children ++ plan.subqueries | ||
| } | ||
| val metrics = plan.metrics.toSeq.map { case (key, metric) => | ||
| new SQLMetricInfo(metric.name.getOrElse(key), metric.id, | ||
| Utils.getFormattedClassName(metric.param)) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.exchange | ||
|
|
||
| import scala.collection.mutable | ||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.broadcast | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.SQLContext | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
||
| /** | ||
| * An interface for exchanges. | ||
| */ | ||
| abstract class Exchange extends UnaryNode { | ||
| override def output: Seq[Attribute] = child.output | ||
| } | ||
|
|
||
| /** | ||
| * A wrapper for reused exchange to have different output, because two exchanges which produce | ||
| * logically identical output will have distinct sets of output attribute ids, so we need to | ||
| * preserve the original ids because they're what downstream operators are expecting. | ||
| */ | ||
| case class ReusedExchange(override val output: Seq[Attribute], child: Exchange) extends LeafNode { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this need output or should that just be child.output
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The aggregate will have different output, even they have same result, because the Aggregate will create new ExprID for the output.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nvm.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, so just to summarize: two exchanges which produce logically identical output will have distinct sets of output attribute ids, so we need to preserve the original ids because they're what downstream operators are expecting.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, because Exchange is unary node, ReusedExchange is leaf node. ReusedExchange is similar to InMemoryColumnarTableScan |
||
|
|
||
| override def sameResult(plan: SparkPlan): Boolean = { | ||
| // Ignore this wrapper. `plan` could also be a ReusedExchange, so we reverse the order here. | ||
| plan.sameResult(child) | ||
| } | ||
|
|
||
| def doExecute(): RDD[InternalRow] = { | ||
| child.execute() | ||
| } | ||
|
|
||
| override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { | ||
| child.executeBroadcast() | ||
| } | ||
|
|
||
| // Do not repeat the same tree in explain. | ||
| override def treeChildren: Seq[SparkPlan] = Nil | ||
| } | ||
|
|
||
| /** | ||
| * Find out duplicated exchanges in the spark plan, then use the same exchange for all the | ||
| * references. | ||
| */ | ||
| private[sql] case class ReuseExchange(sqlContext: SQLContext) extends Rule[SparkPlan] { | ||
|
|
||
| def apply(plan: SparkPlan): SparkPlan = { | ||
| if (!sqlContext.conf.exchangeReuseEnabled) { | ||
| return plan | ||
| } | ||
| // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls. | ||
| val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question, it could be false negative. But usually if two plan have the same result, they should have the same inputs also the same plan and expressions, they should generate the same name (does not include the random ExprId).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose we can always follow up on this later if it turns out to be a problem in practice. |
||
| plan.transformUp { | ||
| case exchange: Exchange => | ||
| // the exchanges that have same results usually also have same schemas (same column names). | ||
| val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]()) | ||
| val samePlan = sameSchema.find { e => | ||
| exchange.sameResult(e) | ||
| } | ||
| if (samePlan.isDefined) { | ||
| // Keep the output of this exchange, the following plans require that to resolve | ||
| // attributes. | ||
| ReusedExchange(exchange.output, samePlan.get) | ||
| } else { | ||
| sameSchema += exchange | ||
| exchange | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,7 +38,7 @@ import org.apache.spark.util.MutablePair | |
| case class ShuffleExchange( | ||
| var newPartitioning: Partitioning, | ||
| child: SparkPlan, | ||
| @transient coordinator: Option[ExchangeCoordinator]) extends UnaryNode { | ||
| @transient coordinator: Option[ExchangeCoordinator]) extends Exchange { | ||
|
|
||
| override def nodeName: String = { | ||
| val extraInfo = coordinator match { | ||
|
|
@@ -55,8 +55,6 @@ case class ShuffleExchange( | |
|
|
||
| override def outputPartitioning: Partitioning = newPartitioning | ||
|
|
||
| override def output: Seq[Attribute] = child.output | ||
|
|
||
| private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) | ||
|
|
||
| override protected def doPrepare(): Unit = { | ||
|
|
@@ -103,16 +101,25 @@ case class ShuffleExchange( | |
| new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices) | ||
| } | ||
|
|
||
| /** | ||
| * Caches the created ShuffleRowRDD so we can reuse that. | ||
| */ | ||
| private var cachedShuffleRDD: ShuffledRowRDD = null | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naive question, but do we need to cache the result after the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. attachTree is only used to generate better error message (show the plan), I think it do not matter here. |
||
| coordinator match { | ||
| case Some(exchangeCoordinator) => | ||
| val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) | ||
| assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) | ||
| shuffleRDD | ||
| case None => | ||
| val shuffleDependency = prepareShuffleDependency() | ||
| preparePostShuffleRDD(shuffleDependency) | ||
| // Returns the same ShuffleRowRDD if this plan is used by multiple plans. | ||
| if (cachedShuffleRDD == null) { | ||
| cachedShuffleRDD = coordinator match { | ||
| case Some(exchangeCoordinator) => | ||
| val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK IntelliJ might give a "suspicious variable shadowing" warning RE: this name, since
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will rename it to 'cachedShuffleRDD' |
||
| assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) | ||
| shuffleRDD | ||
| case None => | ||
| val shuffleDependency = prepareShuffleDependency() | ||
| preparePostShuffleRDD(shuffleDependency) | ||
| } | ||
| } | ||
| cachedShuffleRDD | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nongli, is "canonicalized" sufficiently unambiguous here or do we need to explain what this means?