Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions src/main/scala/org/apache/spark/JsonRelay.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import javax.net.SocketFactory

import org.json4s.jackson.JsonMethods._
import org.json4s.JsonDSL._
import org.apache.spark.scheduler.{SparkListenerExecutorMetricsUpdate, SparkListenerEvent}
import org.apache.spark.util.{Utils, JsonProtocol}
import org.json4s.JsonAST.{JObject, JNothing, JValue}

import org.apache.spark.scheduler._
import org.apache.spark.util.{Utils, JsonProtocol}
import org.apache.spark.ui.OperationGraph

class JsonRelay(conf: SparkConf) extends SparkFirehoseListener {

val appId = conf.get("spark.app.id")
Expand All @@ -26,14 +28,22 @@ class JsonRelay(conf: SparkConf) extends SparkFirehoseListener {
var numReqs = 0
var lastEvent: Option[String] = None

// RDD operation graph events
// Event to submit actual DAG for stage, at this point it is unknown if it will be skipped
val SparkListenerGraphSubmit = "SparkListenerGraphSubmit"
// Event to update current stage "skipped" status
val SparkListenerGraphUpdate = "SparkListenerGraphUpdate"
// Event to check current stage's parent, if it is skipped
val SparkListenerGraphCheck = "SparkListenerGraphCheck"

def debug(s: String): Unit = {
if (debugLogs) {
println(s)
}
}

def initSocketAndWriter() = {
println("*** JsonRelay: initializing socket ***")
debug("*** JsonRelay: initializing socket ***")
socket = socketFactory.createSocket(host, port)
writer = new OutputStreamWriter(socket.getOutputStream, utf8)
}
Expand Down Expand Up @@ -76,6 +86,28 @@ class JsonRelay(conf: SparkConf) extends SparkFirehoseListener {
case j => throw new Exception(s"Non-object SparkListenerEvent $j")
}

// == DAG event processing ==
// `SparkListenerJobStart` event is parsed event to extract DAG for stages, each stage is
// converted into JSON string with added 'appId', and event.
// `SparkListenerStageSubmitted` event is captured to update stage DAG as scheduler has
// committed to execute stage. `SparkListenerGraphCheckParent` is also issued to check parent
// of the stage if it has been submitted as well. If child stage is report to be submitted
// before parent, then parent will be marked as skipped, but will still be updated as runnable
// by subsequent request.
val graphEvents: Seq[String] = event match {
case jobStart: SparkListenerJobStart => jobStart.stageInfos.map { stageInfo =>
compact(OperationGraph.makeJsonStageGraph(stageInfo, jobStart.jobId) ~
("appId" -> appId) ~ ("Event" -> SparkListenerGraphSubmit))
}
case stageSubmitted: SparkListenerStageSubmitted =>
stageSubmitted.stageInfo.parentIds.map { parentStageId =>
compact(OperationGraph.makeJsonStageGraphCheck(parentStageId) ~
("appId" -> appId) ~ ("Event" -> SparkListenerGraphCheck))
} :+ compact(OperationGraph.makeJsonStageGraphUpdate(stageSubmitted.stageInfo) ~
("appId" -> appId) ~ ("Event" -> SparkListenerGraphUpdate))
case otherEvent => Seq.empty
}

val s: String = compact(jv)

numReqs += 1
Expand All @@ -87,14 +119,17 @@ class JsonRelay(conf: SparkConf) extends SparkFirehoseListener {
debug(s"*** Socket is closed... ***")
}
writer.write(s)
// write DAGs, if available
graphEvents.foreach(writer.write)
writer.flush()
} catch {
case e: SocketException =>
socket.close()
initSocketAndWriter()
println(s"*** JsonRelay re-sending: $lastEvent and $s ***")
debug(s"*** JsonRelay re-sending: $lastEvent and $s and DAGs ***")
lastEvent.foreach(writer.write)
writer.write(s)
graphEvents.foreach(writer.write)
writer.flush()
debug("Socket re-sent")
}
Expand Down
66 changes: 66 additions & 0 deletions src/main/scala/org/apache/spark/ui/OperationGraph.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.apache.spark.ui

import org.apache.spark.scheduler.{SparkListenerJobStart, StageInfo}
import org.apache.spark.ui.scope._

import org.json4s.JsonDSL._
import org.json4s.JsonAST.JObject

/**
* [[OperationGraph]] is used to convert [[RDDOperationGraph]] or [[StageInfo]]
* into JSON object, in case of latter also adds stage id and attempt id.
* Currently `node.callsite` is not supported to keep compatibility with Spark 1.5.x
*/
object OperationGraph {
private val graph = RDDOperationGraph

private def edgeToJson(edge: RDDOperationEdge): JObject = {
("fromId" -> edge.fromId) ~ ("toId" -> edge.toId)
}

private def nodeToJson(node: RDDOperationNode): JObject = {
("rddId" -> node.id) ~ ("name" -> node.name) ~ ("cached" -> node.cached)
}

/**
* Convert StageInfo dot file into JSON, this includes job/stage/attempt id, plus all
* necessary metadata to replicate RDD graph for job/stage similar to Spark UI.
* @param stageInfo stage info
* @param jobId job id for stage info
* @return JSON object representing RDD graph by dot file, incoming/outgoing edges, cached rdds
*/
def makeJsonStageGraph(stageInfo: StageInfo, jobId: Int): JObject = {
val dag = OperationGraph.graph.makeOperationGraph(stageInfo)
val dotFile = OperationGraph.graph.makeDotFile(dag)
val outgoingEdges = dag.outgoingEdges.map(edgeToJson)
val incomingEdges = dag.incomingEdges.map(edgeToJson)
// by default all stages are not submitted at the start of job
val submitted = false
val childSubmitted = false
val cachedNodes = dag.rootCluster.getCachedNodes.map(nodeToJson)
("jobId" -> jobId) ~ ("stageId" -> stageInfo.stageId) ~
("dotFile" -> dotFile) ~ ("cachedRDDs" -> cachedNodes) ~
("submitted" -> submitted) ~ ("childSubmitted" -> childSubmitted) ~
("incomingEdges" -> incomingEdges) ~ ("outgoingEdges" -> outgoingEdges)
}

/**
* Check stage whether or not it is submitted, by sending submit status of child stage. Used
* mainly to identify if parent stage is skipped.
* @param stageId stage id
* @return JSON with some check fields
*/
def makeJsonStageGraphCheck(stageId: Int): JObject = {
("stageId" -> stageId) ~ ("childSubmitted" -> true)
}

/**
* Update graph for stage id. Job id is not necessary, since stage id is unique per application.
* Currently only updates stage as submitted, hence not skipped.
* @param stageInfo stage info
* @return JSON object with graph update for stage
*/
def makeJsonStageGraphUpdate(stageInfo: StageInfo): JObject = {
("stageId" -> stageInfo.stageId) ~ ("submitted" -> true)
}
}