-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13523] [SQL] Reuse exchanges in a query #11403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -239,9 +239,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT | |
| override def innerChildren: Seq[PlanType] = subqueries | ||
|
|
||
| /** | ||
| * Cleaned copy of this query plan. | ||
| * Canonicalized copy of this query plan. | ||
| */ | ||
| protected lazy val cleaned: PlanType = this | ||
| protected lazy val canonicalized: PlanType = this | ||
|
|
||
| /** | ||
| * Returns true when the given query plan will return the same results as this query plan. | ||
|
|
@@ -257,8 +257,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT | |
| * can do better should override this function. | ||
| */ | ||
| def sameResult(plan: PlanType): Boolean = { | ||
| val cleanLeft = this.cleaned | ||
| val cleanRight = plan.cleaned | ||
| val cleanLeft = this.canonicalized | ||
|
||
| val cleanRight = plan.canonicalized | ||
| cleanLeft.getClass == cleanRight.getClass && | ||
| cleanLeft.children.size == cleanRight.children.size && | ||
| cleanLeft.cleanArgs == cleanRight.cleanArgs && | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,8 @@ class SparkPlanInfo( | |
| val metrics: Seq[SQLMetricInfo]) { | ||
|
|
||
| override def hashCode(): Int = { | ||
| // hashCode of simpleString should be good enough to distinguish the plans from each other | ||
| // within a plan | ||
| simpleString.hashCode | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why doesn't this use the same fields as equals? comment
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was the intent here to avoid an expensive-to-compute recursive hashcode over children? If so, would memoization help instead?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only need to make sure that those object equal with each other should have the same hashCode, but the hashCode does not need to considering all the members. Using simpleString here should be enough to have good hashCode (be good enough to distinguish each other within a plan) |
||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,8 +37,9 @@ abstract class Exchange extends UnaryNode { | |
| } | ||
|
|
||
| /** | ||
| * A wrapper for reused exchange to have different output, which is required to resolve the | ||
| * attributes in following plans. | ||
| * A wrapper for reused exchange to have different output, because two exchanges which produce | ||
| * logically identical output will have distinct sets of output attribute ids, so we need to | ||
| * preserve the original ids because they're what downstream operators are expecting. | ||
| */ | ||
| case class ReusedExchange(override val output: Seq[Attribute], child: Exchange) extends LeafNode { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this need output or should that just be child.output
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The aggregate will have different output, even they have same result, because the Aggregate will create new ExprID for the output.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nvm.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, so just to summarize: two exchanges which produce logically identical output will have distinct sets of output attribute ids, so we need to preserve the original ids because they're what downstream operators are expecting.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, because Exchange is unary node, ReusedExchange is leaf node. ReusedExchange is similar to InMemoryColumnarTableScan |
||
|
|
||
|
|
@@ -73,15 +74,15 @@ private[sql] case class ReuseExchange(sqlContext: SQLContext) extends Rule[Spark | |
| val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question, it could be false negative. But usually if two plan have the same result, they should have the same inputs also the same plan and expressions, they should generate the same name (does not include the random ExprId).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose we can always follow up on this later if it turns out to be a problem in practice. |
||
| plan.transformUp { | ||
| case exchange: Exchange => | ||
| // the exchanges that have same results usually also have same schemas (same column names). | ||
| val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]()) | ||
| val samePlan = sameSchema.find { e => | ||
| exchange.sameResult(e) | ||
| } | ||
| if (samePlan.isDefined) { | ||
| // Keep the output of this exchange, the following plans require that to resolve | ||
| // attributes. | ||
| val reused = ReusedExchange(exchange.output, samePlan.get) | ||
| reused | ||
| ReusedExchange(exchange.output, samePlan.get) | ||
| } else { | ||
| sameSchema += exchange | ||
| exchange | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,12 +104,12 @@ case class ShuffleExchange( | |
| /** | ||
| * Caches the created ShuffleRowRDD so we can reuse that. | ||
| */ | ||
| private var shuffleRDD: ShuffledRowRDD = null | ||
| private var cachedShuffleRDD: ShuffledRowRDD = null | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naive question, but do we need to cache the result after the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. attachTree is only used to generate better error message (show the plan), I think it do not matter here. |
||
| // Returns the same ShuffleRowRDD if this plan is used by multiple plans. | ||
| if (shuffleRDD == null) { | ||
| shuffleRDD = coordinator match { | ||
| if (cachedShuffleRDD == null) { | ||
| cachedShuffleRDD = coordinator match { | ||
| case Some(exchangeCoordinator) => | ||
| val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK IntelliJ might give a "suspicious variable shadowing" warning RE: this name, since
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will rename it to 'cachedShuffleRDD' |
||
| assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) | ||
|
|
@@ -119,7 +119,7 @@ case class ShuffleExchange( | |
| preparePostShuffleRDD(shuffleDependency) | ||
| } | ||
| } | ||
| shuffleRDD | ||
| cachedShuffleRDD | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,8 +104,7 @@ private[sql] object SparkPlanGraph { | |
| } else { | ||
| subgraph.nodes += node | ||
| } | ||
| // ShuffleExchange or BroadcastExchange | ||
| if (name.endsWith("Exchange")) { | ||
| if (name == "ShuffleExchange" || name == "BroadcastExchange") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nongli, just to make sure that your original comment here was addressed: were you worried about this pattern being incomplete if we add a new type of exchange? If that's the case, then the move from
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm glad to revert this change. Btw, it's hard to protect future change anyway (because of unknown). |
||
| exchanges += planInfo -> node | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nongli, is "canonicalized" sufficiently unambiguous here or do we need to explain what this means?