Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
39ba441
spark-9104 first draft version
liyezhang556520 Aug 17, 2015
ecc1044
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Aug 17, 2015
2101538
show N/A for nio
liyezhang556520 Aug 17, 2015
9ccaf88
handle executor add and remove event for memotyTab
liyezhang556520 Aug 18, 2015
13c17fb
show removed executors info on page
liyezhang556520 Aug 19, 2015
c9b44b1
add stage memory trace
liyezhang556520 Aug 19, 2015
984feaf
add history support for heartbeat event
liyezhang556520 Aug 20, 2015
2501c82
limit history event log frequency
liyezhang556520 Aug 20, 2015
e0ae855
add some comments for EventLoggingListener
liyezhang556520 Aug 20, 2015
7491279
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Aug 20, 2015
424c172
scala style fix
liyezhang556520 Aug 20, 2015
f21a804
remove executor port and fix test failure
liyezhang556520 Aug 21, 2015
2f3d30b
merge spache/master after master updated
liyezhang556520 Sep 25, 2015
7b846a2
work with JavaConverters
liyezhang556520 Oct 9, 2015
41874aa
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Oct 9, 2015
0531d0f
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Oct 29, 2015
27b7da1
refine the code according to Imran's comments and the design doc
liyezhang556520 Nov 2, 2015
a8fcf74
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 2, 2015
f2f0e64
fix scala style test
liyezhang556520 Nov 3, 2015
5f7a999
capitalize class name
liyezhang556520 Nov 3, 2015
5ad7a6a
change task metrics json format back to origin
liyezhang556520 Nov 3, 2015
c836fb9
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 3, 2015
b5aa4da
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 5, 2015
e8e2bdd
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 6, 2015
1dffa29
accroding to Imran's comment, refine the code
liyezhang556520 Nov 17, 2015
75e63c3
add first test case
liyezhang556520 Nov 17, 2015
0c1241c
fix scala style
liyezhang556520 Nov 17, 2015
c78628e
add more test cases, with eventloging test left
liyezhang556520 Nov 19, 2015
a93bd96
scala style fix
liyezhang556520 Nov 19, 2015
89214f3
fix test fail and add event logging unit test
liyezhang556520 Nov 23, 2015
1ed48c1
scala syle
liyezhang556520 Nov 23, 2015
cb307aa
merge to apache/master branch, fix merge conflict
liyezhang556520 Nov 24, 2015
b438077
roll back useless change
liyezhang556520 Nov 24, 2015
4123ac7
modify the code according to Imran's comments, mainly with unit test
liyezhang556520 Dec 8, 2015
2ce9fd9
fix scala style
liyezhang556520 Dec 8, 2015
17d094e
merge to master branch with tests update
liyezhang556520 Dec 8, 2015
4b3dbe4
change port to option and some bug fixes
liyezhang556520 Dec 9, 2015
0ea7cab
address comments of code refinement
liyezhang556520 Jan 12, 2016
5e031ce
merge to latest master branch from spark-9104-draft
liyezhang556520 Jan 12, 2016
87f8172
fix import ordering error
liyezhang556520 Jan 12, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix test fail and add event logging unit test
  • Loading branch information
liyezhang556520 committed Nov 23, 2015
commit 89214f342d37daea4f27d08d08cee145e63f3eab
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ class ExecutorMetrics extends Serializable {
private[spark] def setTransportMetrics(value: TransportMetrics) = {
_transportMetrics = value
}

// for test only
def metricsDetails = {
(hostname, transportMetrics.timeStamp, transportMetrics.onHeapSize,
transportMetrics.offHeapSize)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SPARK_VERSION}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.TransportMetrics
import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}

Expand Down Expand Up @@ -169,10 +169,9 @@ private[spark] class EventLoggingListener(
// in {{executorIdToLatestMetrics}}.
private def updateAndLogExecutorMemoryMetrics() : Unit = {
executorIdToModifiedMaxMetrics.foreach { case(_, metrics) => logEvent(metrics) }
// Clear the modified metrics map after each log action
executorIdToModifiedMaxMetrics.clear()
executorIdToLatestMetrics.foreach {case(_, metrics) => logEvent(metrics) }
executorIdToLatestMetrics.foreach { case (executorId, metrics) =>
executorIdToModifiedMaxMetrics.update(executorId, metrics)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rename this to updateAndLogExecutorMemoryMetrics or something like that, to be a little more specific. I'd also change the first sentence of the comment to something like

When a stage is submitted and completed, we updated our executor memory metrics for that stage, and then log the metrics. Anytime we receive more executor metrics, we update our running set of {{maxMetrics}} and {{latestMetrics}}.

I don't understand the last two sentences of the comment -- can you expand on that?

Finally you should use foreach and you can use case to extract the parts you want and make it a little clearer:

modifiedMetrics.foreach { case (_, metrics) => logEvent(metrics) }
latestMetrics.foreach { case (executorId, metrics) => modifiedMetrics.update(executorId, metrics) }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the last two sentences of the comment -- can you expand on that?

I'll update the code according to the design doc. I think the code is not that correct. Please refer it in design doc


// Events that do not trigger a flush
Expand Down Expand Up @@ -234,8 +233,12 @@ private[spark] class EventLoggingListener(

// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
executorIdToLatestMetrics.update(event.execId, event)
updateModifiedMetrics(event.execId)
// In order to avoid the logged event consumes too much storage size, taskMetrics would not
// be logged into event log file currently
val lightEvent = SparkListenerExecutorMetricsUpdate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment above the method is inaccurate (this is no longer a no-op obviously). Can you change it something like "Track executor metrics for logging on stage start and end".

I'd also update the inner comment to something like "We only track the executor metrics in each stage, so we drop the task metrics as they are quite verbose". and maybe rename "lightEvent" to "eventWithoutTaskMetrics"?

event.execId, event.executorMetrics, Seq.empty)
executorIdToLatestMetrics.update(lightEvent.execId, lightEvent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: more idiomatic scala to write map.update(key,value) with map(key) = value (scala syntax sugar for update method)

updateModifiedMetrics(lightEvent.execId)
}

/**
Expand Down Expand Up @@ -288,8 +291,15 @@ private[spark] class EventLoggingListener(
} else {
toBeModTransMetrics.offHeapSize
}
toBeModifiedEvent.executorMetrics.setTransportMetrics(
TransportMetrics(timeStamp, onHeapSize, offHeapSize))

// We should maintain a new instance for each update to avoid side-effect
val modifiedExecMetrics = new ExecutorMetrics()
modifiedExecMetrics.setHostname(toBeModifiedEvent.executorMetrics.hostname)
modifiedExecMetrics.setTransportMetrics(TransportMetrics(
timeStamp, onHeapSize, offHeapSize))
val modifiedEvent = SparkListenerExecutorMetricsUpdate(
toBeModifiedEvent.execId, modifiedExecMetrics, toBeModifiedEvent.taskMetrics)
executorIdToModifiedMaxMetrics.update(executorId, modifiedEvent)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,105 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
"a fine:mind$dollar{bills}.1", None, Some("lz4")))
}

test("test event logger logging executor metrics") {
import org.apache.spark.scheduler.cluster._
import org.apache.spark.ui.memory._
val conf = EventLoggingListenerSuite.getLoggingConf(testDirPath)
val eventLogger = new EventLoggingListener("test-memListener", None, testDirPath.toUri(), conf)
val execId = "exec-1"
val hostName = "host-1"

eventLogger.start()
eventLogger.onExecutorAdded(SparkListenerExecutorAdded(
0L, execId, new ExecutorInfo(hostName, 1, Map.empty)))

// stage 1 and stage 2 submitted
eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(1))
eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(2))
val execMetrics1 = MemoryListenerSuite.createExecutorMetrics(hostName, 1L, 20, 10)
eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
execId, execMetrics1))
val execMetrics2 = MemoryListenerSuite.createExecutorMetrics(hostName, 2L, 30, 10)
eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
execId, execMetrics2))
// stage1 completed
eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(1))
// stage3 submitted
eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(3))
val execMetrics3 = MemoryListenerSuite.createExecutorMetrics(hostName, 3L, 30, 30)
eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
execId, execMetrics3))
val execMetrics4 = MemoryListenerSuite.createExecutorMetrics(hostName, 4L, 20, 25)
eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
execId, execMetrics4))
// stage 2 completed
eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(2))
val execMetrics5 = MemoryListenerSuite.createExecutorMetrics(hostName, 5L, 15, 15)
eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
execId, execMetrics5))
val execMetrics6 = MemoryListenerSuite.createExecutorMetrics(hostName, 6L, 25, 10)
eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
execId, execMetrics6))
// stage 3 completed
eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(3))

eventLogger.onExecutorRemoved(SparkListenerExecutorRemoved(7L, execId, ""))

// Totally there are 15 logged events, including:
// 2 events of executor Added/Removed
// 6 events of stage Submitted/Completed
// 7 events of executorMetrics update (3 combined metrics and 4 original metrics)
assert(eventLogger.loggedEvents.size === 15)
eventLogger.stop()

val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
val lines = readLines(logData)
Utils.tryWithSafeFinally {
// totally there are 15 lines, including SparkListenerLogStart event and 14 other events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment is wrong (off by one). maybe just make comment "one extra line for SparkListenerLogStart"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, forget to update the comments, thanks.

assert(lines.size === 16)

// 4 executor metrics that is the latest metrics updated before stage submit and complete
val jsonMetrics = JsonProtocol.sparkEventFromJson(parse(lines(5)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of pulling out very specific lines of the log here -- it makes it harder for the reader to follow, and also makes the tests more brittle. Could you instead have a util method like getLastExecutorMetricBeforeStageEnd(events: Seq[SparkListenerEvent], stageId: Int): SparkListenerExecutorMetricsUpdate? Then your checks would be more clear, they'd look like

parsedLines = line.map { line => JsonProtocol.sparkEventFromJson(parse(line)) }
...
checkExecutorMetrics(
  metrics = getLastExecutorMetricBeforeStageEnd(parsedLines, 3),
  expMetrics = ...
)

(not quite the right args, but hopefully that conveys the idea). You'd also need to make sure the stage end events had a completion time in there to be able to grab the right event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another idea: write an integration test, which pumps the parsed events back into the MemoryListener, and make sure it has the right status for each stage. I actually think that might be a better idea for gaining confidence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I'll add an integration test, make it cleaner.

assert(Utils.getFormattedClassName(jsonMetrics) === Utils.getFormattedClassName(
SparkListenerExecutorMetricsUpdate))
val jsonMetrics2 = jsonMetrics.asInstanceOf[SparkListenerExecutorMetricsUpdate]
assert((execId, (hostName, 2L, 30, 10)) === (jsonMetrics2.execId, jsonMetrics2
.executorMetrics.metricsDetails))

val jsonMetrics4 = JsonProtocol.sparkEventFromJson(parse(lines(7)))
.asInstanceOf[SparkListenerExecutorMetricsUpdate]
val jsonMetrics6 = JsonProtocol.sparkEventFromJson(parse(lines(10)))
.asInstanceOf[SparkListenerExecutorMetricsUpdate]
val jsonMetrics8 = JsonProtocol.sparkEventFromJson(parse(lines(13)))
.asInstanceOf[SparkListenerExecutorMetricsUpdate]
assert((execId, (hostName, 2L, 30, 10)) === (jsonMetrics4.execId, jsonMetrics4
.executorMetrics.metricsDetails))
assert((execId, (hostName, 4L, 20, 25)) === (jsonMetrics6.execId, jsonMetrics6
.executorMetrics.metricsDetails))
assert((execId, (hostName, 6L, 25, 10)) === (jsonMetrics8.execId, jsonMetrics8
.executorMetrics.metricsDetails))

// 3 executor metrics that is combined metrics that updated during each time segment
// There is no combined metrics before "jsonMetrics4" (lines(7)) because there is no
// metrics update between stage 1 complete and stage 3 submit. So only the last metrics
// update will be logged.
val jsonMetrics1 = JsonProtocol.sparkEventFromJson(parse(lines(4)))
.asInstanceOf[SparkListenerExecutorMetricsUpdate]
val jsonMetrics5 = JsonProtocol.sparkEventFromJson(parse(lines(9)))
.asInstanceOf[SparkListenerExecutorMetricsUpdate]
val jsonMetrics7 = JsonProtocol.sparkEventFromJson(parse(lines(12)))
.asInstanceOf[SparkListenerExecutorMetricsUpdate]
assert((execId, (hostName, 2L, 30, 10)) === (jsonMetrics1.execId, jsonMetrics1
.executorMetrics.metricsDetails))
assert((execId, (hostName, 3L, 30, 30)) === (jsonMetrics5.execId, jsonMetrics5
.executorMetrics.metricsDetails))
assert((execId, (hostName, 6L, 25, 15)) === (jsonMetrics7.execId, jsonMetrics7
.executorMetrics.metricsDetails))
} {
logData.close()
}
}

/* ----------------- *
* Actual test logic *
* ----------------- */
Expand Down
Loading