-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13415] [SQL] Visualize subquery in SQL web UI #11417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 2 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
24757f1
Visualize subquery in SQL web UI
bc2c66b
fix tests
faba044
Merge branch 'master' of github.com:apache/spark into viz_subquery
cff0871
address comments
43dba95
Merge branch 'master' of github.com:apache/spark into viz_subquery
3293994
fix conflict
e69262a
update doc string
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -453,6 +453,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { | |
| */ | ||
| protected def treeChildren: Seq[BaseType] = children | ||
|
|
||
| /** | ||
| * All the nodes those are parts of this node. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the nodes that are? How is this different than children? |
||
| */ | ||
| protected def innerChildren: Seq[BaseType] = Nil | ||
|
|
||
| /** | ||
| * Appends the string represent of this node and its children to the given StringBuilder. | ||
| * | ||
|
|
@@ -475,6 +480,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { | |
| builder.append(simpleString) | ||
| builder.append("\n") | ||
|
|
||
| if (innerChildren.nonEmpty) { | ||
| innerChildren.init.foreach(_.generateTreeString( | ||
| depth + 2, lastChildren :+ false :+ false, builder)) | ||
| innerChildren.last.generateTreeString(depth + 2, lastChildren :+ false :+ true, builder) | ||
| } | ||
|
|
||
| if (treeChildren.nonEmpty) { | ||
| treeChildren.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder)) | ||
| treeChildren.last.generateTreeString(depth + 1, lastChildren :+ true, builder) | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,8 +17,6 @@ | |
|
|
||
| package org.apache.spark.sql.execution | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.broadcast | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.SQLContext | ||
|
|
@@ -29,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning | |
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.catalyst.util.toCommentSafeString | ||
| import org.apache.spark.sql.execution.aggregate.TungstenAggregate | ||
| import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, BuildLeft, BuildRight, SortMergeJoin} | ||
| import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin} | ||
| import org.apache.spark.sql.execution.metric.LongSQLMetricValue | ||
|
|
||
| /** | ||
|
|
@@ -163,16 +161,12 @@ trait CodegenSupport extends SparkPlan { | |
| * This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes | ||
| * an RDD iterator of InternalRow. | ||
| */ | ||
| case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport { | ||
| case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport { | ||
|
|
||
| override def output: Seq[Attribute] = child.output | ||
| override def outputPartitioning: Partitioning = child.outputPartitioning | ||
| override def outputOrdering: Seq[SortOrder] = child.outputOrdering | ||
|
|
||
| override def doPrepare(): Unit = { | ||
| child.prepare() | ||
| } | ||
|
|
||
| override def doExecute(): RDD[InternalRow] = { | ||
| child.execute() | ||
| } | ||
|
|
@@ -181,8 +175,6 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport { | |
| child.doExecuteBroadcast() | ||
| } | ||
|
|
||
| override def supportCodegen: Boolean = false | ||
|
|
||
| override def upstreams(): Seq[RDD[InternalRow]] = { | ||
| child.execute() :: Nil | ||
| } | ||
|
|
@@ -210,6 +202,8 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport { | |
| } | ||
|
|
||
| override def simpleString: String = "INPUT" | ||
|
|
||
| override def treeChildren: Seq[SparkPlan] = Nil | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -243,30 +237,23 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport { | |
| * doCodeGen() will create a CodeGenContext, which will hold a list of variables for input, | ||
| * used to generated code for BoundReference. | ||
| */ | ||
| case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan]) | ||
| extends SparkPlan with CodegenSupport { | ||
|
|
||
| override def supportCodegen: Boolean = false | ||
|
|
||
| override def output: Seq[Attribute] = plan.output | ||
| override def outputPartitioning: Partitioning = plan.outputPartitioning | ||
| override def outputOrdering: Seq[SortOrder] = plan.outputOrdering | ||
| case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSupport { | ||
|
|
||
| override def doPrepare(): Unit = { | ||
| plan.prepare() | ||
| } | ||
| override def output: Seq[Attribute] = child.output | ||
| override def outputPartitioning: Partitioning = child.outputPartitioning | ||
| override def outputOrdering: Seq[SortOrder] = child.outputOrdering | ||
|
|
||
| override def doExecute(): RDD[InternalRow] = { | ||
| val ctx = new CodegenContext | ||
| val code = plan.produce(ctx, this) | ||
| val code = child.asInstanceOf[CodegenSupport].produce(ctx, this) | ||
| val references = ctx.references.toArray | ||
| val source = s""" | ||
| public Object generate(Object[] references) { | ||
| return new GeneratedIterator(references); | ||
| } | ||
|
|
||
| /** Codegened pipeline for: | ||
| * ${toCommentSafeString(plan.treeString.trim)} | ||
| * ${toCommentSafeString(child.treeString.trim)} | ||
| */ | ||
| class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { | ||
|
|
||
|
|
@@ -294,7 +281,7 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan]) | |
| // println(s"${CodeFormatter.format(cleanedSource)}") | ||
| CodeGenerator.compile(cleanedSource) | ||
|
|
||
| val rdds = plan.upstreams() | ||
| val rdds = child.asInstanceOf[CodegenSupport].upstreams() | ||
| assert(rdds.size <= 2, "Up to two upstream RDDs can be supported") | ||
| if (rdds.length == 1) { | ||
| rdds.head.mapPartitions { iter => | ||
|
|
@@ -362,33 +349,20 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan]) | |
| } | ||
|
|
||
| private[sql] override def resetMetrics(): Unit = { | ||
| plan.foreach(_.resetMetrics()) | ||
| child.foreach(_.resetMetrics()) | ||
| } | ||
|
|
||
| override def generateTreeString( | ||
| depth: Int, | ||
| lastChildren: Seq[Boolean], | ||
| builder: StringBuilder): StringBuilder = { | ||
| if (depth > 0) { | ||
| lastChildren.init.foreach { isLast => | ||
| val prefixFragment = if (isLast) " " else ": " | ||
| builder.append(prefixFragment) | ||
| } | ||
|
|
||
| val branch = if (lastChildren.last) "+- " else ":- " | ||
| builder.append(branch) | ||
| } | ||
|
|
||
| builder.append(simpleString) | ||
| builder.append("\n") | ||
| override def innerChildren: Seq[SparkPlan] = { | ||
| child :: Nil | ||
| } | ||
|
|
||
| plan.generateTreeString(depth + 2, lastChildren :+ false :+ true, builder) | ||
| if (children.nonEmpty) { | ||
| children.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder)) | ||
| children.last.generateTreeString(depth + 1, lastChildren :+ true, builder) | ||
| } | ||
| private def collectDirectInputs(plan: SparkPlan): Seq[SparkPlan] = plan match { | ||
|
||
| case InputAdapter(c) => c :: Nil | ||
| case other => other.children.flatMap(collectDirectInputs) | ||
| } | ||
|
|
||
| builder | ||
| override def treeChildren: Seq[SparkPlan] = { | ||
| collectDirectInputs(child) | ||
| } | ||
|
|
||
| override def simpleString: String = "WholeStageCodegen" | ||
|
|
@@ -416,27 +390,34 @@ private[sql] case class CollapseCodegenStages(sqlContext: SQLContext) extends Ru | |
| case _ => false | ||
| } | ||
|
|
||
| /** | ||
| * Inserts a InputAdapter on top of those that do not support codegen. | ||
| */ | ||
| private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match { | ||
| case j @ SortMergeJoin(_, _, _, left, right) => | ||
| // The children of SortMergeJoin should do codegen separately. | ||
| j.copy(left = InputAdapter(insertWholeStageCodegen(left)), | ||
| right = InputAdapter(insertWholeStageCodegen(right))) | ||
| case p if !supportCodegen(p) => | ||
| // collapse them recursively | ||
| InputAdapter(insertWholeStageCodegen(p)) | ||
| case p => | ||
| p.withNewChildren(p.children.map(insertInputAdapter)) | ||
| } | ||
|
|
||
| /** | ||
| * Inserts a WholeStageCodegen on top of those that support codegen. | ||
| */ | ||
| private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match { | ||
| case plan: CodegenSupport if supportCodegen(plan) => | ||
| WholeStageCodegen(insertInputAdapter(plan)) | ||
| case other => | ||
| other.withNewChildren(other.children.map(insertWholeStageCodegen)) | ||
| } | ||
|
|
||
| def apply(plan: SparkPlan): SparkPlan = { | ||
| if (sqlContext.conf.wholeStageEnabled) { | ||
| plan.transform { | ||
| case plan: CodegenSupport if supportCodegen(plan) => | ||
| var inputs = ArrayBuffer[SparkPlan]() | ||
| val combined = plan.transform { | ||
| // The build side can't be compiled together | ||
| case b @ BroadcastHashJoin(_, _, _, BuildLeft, _, left, right) => | ||
| b.copy(left = apply(left)) | ||
| case b @ BroadcastHashJoin(_, _, _, BuildRight, _, left, right) => | ||
| b.copy(right = apply(right)) | ||
| case j @ SortMergeJoin(_, _, _, left, right) => | ||
| // The children of SortMergeJoin should do codegen separately. | ||
| j.copy(left = apply(left), right = apply(right)) | ||
| case p if !supportCodegen(p) => | ||
| val input = apply(p) // collapse them recursively | ||
| inputs += input | ||
| InputAdapter(input) | ||
| }.asInstanceOf[CodegenSupport] | ||
| WholeStageCodegen(combined, inputs) | ||
| } | ||
| insertWholeStageCodegen(plan) | ||
| } else { | ||
| plan | ||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is
treeChildren? Its doc in TreeNode does not really show the difference between it andchildren.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the doc string of treeChildren.