Skip to content

Commit d608f99

Browse files
cenyuhaiwgtmac
authored andcommitted
[SPARK-17171][WEB UI] DAG will list all partitions in the graph
## What changes were proposed in this pull request? DAG will list all partitions in the graph, it is too slow and hard to see all graph. Always we don't want to see all partitions,we just want to see the relations of DAG graph. So I just show 2 root nodes for Rdds. Before this PR, the DAG graph looks like [dag1.png](https://issues.apache.org/jira/secure/attachment/12824702/dag1.png), [dag3.png](https://issues.apache.org/jira/secure/attachment/12825456/dag3.png), after this PR, the DAG graph looks like [dag2.png](https://issues.apache.org/jira/secure/attachment/12824703/dag2.png),[dag4.png](https://issues.apache.org/jira/secure/attachment/12825457/dag4.png) Author: cenyuhai <cenyuhai@didichuxing.com> Author: 岑玉海 <261810726@qq.com> Closes apache#14737 from cenyuhai/SPARK-17171.
1 parent 7afab77 commit d608f99

File tree

2 files changed

+33
-8
lines changed

2 files changed

+33
-8
lines changed

core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringEscapeUtils
2626

2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.scheduler.StageInfo
29-
import org.apache.spark.storage.StorageLevel
29+
import org.apache.spark.storage.{RDDInfo, StorageLevel}
3030

3131
/**
3232
* A representation of a generic cluster graph used for storing information on RDD operations.
@@ -107,7 +107,7 @@ private[ui] object RDDOperationGraph extends Logging {
107107
* supporting in the future if we decide to group certain stages within the same job under
108108
* a common scope (e.g. part of a SQL query).
109109
*/
110-
def makeOperationGraph(stage: StageInfo): RDDOperationGraph = {
110+
def makeOperationGraph(stage: StageInfo, retainedNodes: Int): RDDOperationGraph = {
111111
val edges = new ListBuffer[RDDOperationEdge]
112112
val nodes = new mutable.HashMap[Int, RDDOperationNode]
113113
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID
@@ -119,18 +119,37 @@ private[ui] object RDDOperationGraph extends Logging {
119119
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
120120
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
121121

122+
var rootNodeCount = 0
123+
val addRDDIds = new mutable.HashSet[Int]()
124+
val dropRDDIds = new mutable.HashSet[Int]()
125+
122126
// Find nodes, edges, and operation scopes that belong to this stage
123-
stage.rddInfos.foreach { rdd =>
124-
edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
127+
stage.rddInfos.sortBy(_.id).foreach { rdd =>
128+
val parentIds = rdd.parentIds
129+
val isAllowed =
130+
if (parentIds.isEmpty) {
131+
rootNodeCount += 1
132+
rootNodeCount <= retainedNodes
133+
} else {
134+
parentIds.exists(id => addRDDIds.contains(id) || !dropRDDIds.contains(id))
135+
}
136+
137+
if (isAllowed) {
138+
addRDDIds += rdd.id
139+
edges ++= parentIds.filter(id => !dropRDDIds.contains(id)).map(RDDOperationEdge(_, rdd.id))
140+
} else {
141+
dropRDDIds += rdd.id
142+
}
125143

126144
// TODO: differentiate between the intention to cache an RDD and whether it's actually cached
127145
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(
128146
rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite))
129-
130147
if (rdd.scope.isEmpty) {
131148
// This RDD has no encompassing scope, so we put it directly in the root cluster
132149
// This should happen only if an RDD is instantiated outside of a public RDD API
133-
rootCluster.attachChildNode(node)
150+
if (isAllowed) {
151+
rootCluster.attachChildNode(node)
152+
}
134153
} else {
135154
// Otherwise, this RDD belongs to an inner cluster,
136155
// which may be nested inside of other clusters
@@ -154,7 +173,9 @@ private[ui] object RDDOperationGraph extends Logging {
154173
rootCluster.attachChildCluster(cluster)
155174
}
156175
}
157-
rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
176+
if (isAllowed) {
177+
rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
178+
}
158179
}
159180
}
160181

core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
4141
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
4242
private[ui] val stageIds = new mutable.ArrayBuffer[Int]
4343

44+
// How many root nodes to retain in DAG Graph
45+
private[ui] val retainedNodes =
46+
conf.getInt("spark.ui.dagGraph.retainedRootRDDs", Int.MaxValue)
47+
4448
// How many jobs or stages to retain graph metadata for
4549
private val retainedJobs =
4650
conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
@@ -82,7 +86,7 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
8286
val stageId = stageInfo.stageId
8387
stageIds += stageId
8488
stageIdToJobId(stageId) = jobId
85-
stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
89+
stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo, retainedNodes)
8690
trimStagesIfNecessary()
8791
}
8892

0 commit comments

Comments
 (0)