Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 85 additions & 28 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,41 +40,108 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

import JobProgressListener._

// Define a handful of type aliases so that data structures' types can serve as documentation.
// These type aliases are public because they're used in the types of public fields:

type JobId = Int
type StageId = Int
type StageAttemptId = Int
type PoolName = String
type ExecutorId = String

// How many stages to remember
val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
// How many jobs to remember
val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
// Define all of our state:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: comment a little unnecessary given that's what this section is normally for?


// Jobs:
val activeJobs = new HashMap[JobId, JobUIData]
val completedJobs = ListBuffer[JobUIData]()
val failedJobs = ListBuffer[JobUIData]()
val jobIdToData = new HashMap[JobId, JobUIData]

// Stages:
val activeStages = new HashMap[StageId, StageInfo]
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
val stageIdToInfo = new HashMap[StageId, StageInfo]

// Number of completed and failed stages, may not actually equal to completedStages.size and
// failedStages.size respectively due to completedStage and failedStages only maintain the latest
// part of the stages, the earlier ones will be removed when there are too many stages for
// memory sake.
val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
// Total of completed and failed stages that have ever been run. These may be greater than
// `completedStages.size` and `failedStages.size` if we have run more stages or jobs than
// JobProgressListener's retention limits.
var numCompletedStages = 0
var numFailedStages = 0

// Map from pool name to a hash map (map from stage id to StageInfo).
val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()

val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
// Misc:
val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
def blockManagerIds = executorIdToBlockManagerId.values.toSeq

var schedulingMode: Option[SchedulingMode] = None

def blockManagerIds = executorIdToBlockManagerId.values.toSeq
// To limit the total memory usage of JobProgressListener, we only track information for a fixed
// number of non-active jobs and stages (there is no limit for active jobs and stages):

val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)

// We can test for memory leaks by ensuring that collections that track non-active jobs and
// stages do not grow without bound and that collections for active jobs/stages eventually become
// empty once Spark is idle. Let's partition our collections into ones that should be empty
// once Spark is idle and ones that should have a hard- or soft-limited sizes.
// These methods are used by unit tests, but they're defined here so that people don't forget to
// update the tests when adding new collections. Some collections have multiple levels of
// nesting, etc, so this lets us customize our notion of "size" for each structure:

// These collections should all be empty once Spark is idle (no active stages / jobs):
private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, Int] = {
Map(
"activeStages" -> activeStages.size,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The keys in these maps don't seem to be used anywhere, any reason not to just use a list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're used in the unit tests to print informative error messages (see line 65 of JobProgressListenerSuite); this lets us figure out which collection was non-empty.

"activeJobs" -> activeJobs.size,
"poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum
)
}

// These collections should stop growing once we have run at least `spark.ui.retainedStages`
// stages and `spark.ui.retainedJobs` jobs:
private[spark] def getSizesOfHardSizeLimitedCollections: Map[String, Int] = {
Map(
"completedJobs" -> completedJobs.size,
"failedJobs" -> failedJobs.size,
"completedStages" -> completedStages.size,
"failedStages" -> failedStages.size
)
}

// These collections may grow arbitrarily, but once Spark becomes idle they should shrink back to
// some bound based on the `spark.ui.retainedStages` and `spark.ui.retainedJobs` settings:
private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, Int] = {
Map(
"jobIdToData" -> jobIdToData.size,
"stageIdToData" -> stageIdToData.size,
"stageIdToStageInfo" -> stageIdToInfo.size
)
}

/** If stages is too large, remove and garbage collect old stages */
private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToData.remove((s.stageId, s.attemptId))
stageIdToInfo.remove(s.stageId)
}
stages.trimStart(toRemove)
}
}

/** If jobs is too large, remove and garbage collect old jobs */
private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
if (jobs.size > retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this kinda threw me off a bit. The code is correct and the test works as it should, but the logic is a little weird because this might remove more elements than needed to satisfy the limits.

This method is called on every change to the passed jobs list, so at most jobs.size - retainedJobs will be 1. If retainedStages >= 20, you'll remove more than the needed element to satisfy the limit.

This is fine, but it would be nice if this behavior were documented (even if it's just a comment here somewhere), and the test actually triggered it (by using a value for retainedStages that would trigger this condition, instead of 5).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is a little puzzling (this was copied over from the old code). It looks like the pattern here is essentially to create some size-limited collections with a FIFO eviction policy plus some callbacks when items are evicted. A more bulletproof approach would be to create our own size-limited collection wrapper / subclass with these eviction callbacks, since this would prevent mistakes where someone adds an item to the collection but forgets to tall trim*IfNecessary. I think we should do this as part of a separate commit, though, since I want to limit the scope of this change and want to get this in now to unblock a different patch.

jobs.take(toRemove).foreach { job =>
jobIdToData.remove(job.jobId)
}
jobs.trimStart(toRemove)
}
}

override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
Expand All @@ -92,9 +159,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
jobEnd.jobResult match {
case JobSucceeded =>
completedJobs += jobData
trimJobsIfNecessary(completedJobs)
jobData.status = JobExecutionStatus.SUCCEEDED
case JobFailed(exception) =>
failedJobs += jobData
trimJobsIfNecessary(failedJobs)
jobData.status = JobExecutionStatus.FAILED
}
}
Expand All @@ -118,23 +187,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
if (stage.failureReason.isEmpty) {
completedStages += stage
numCompletedStages += 1
trimIfNecessary(completedStages)
trimStagesIfNecessary(completedStages)
} else {
failedStages += stage
numFailedStages += 1
trimIfNecessary(failedStages)
}
}

/** If stages is too large, remove and garbage collect old stages */
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToData.remove((s.stageId, s.attemptId))
stageIdToInfo.remove(s.stageId)
}
stages.trimStart(toRemove)
trimStagesIfNecessary(failedStages)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,102 @@ import org.apache.spark.util.Utils

class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers {

test("test LRU eviction of stages") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
val listener = new JobProgressListener(conf)

def createStageStartEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
SparkListenerStageSubmitted(stageInfo)
private def createStageStartEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
SparkListenerStageSubmitted(stageInfo)
}

private def createStageEndEvent(stageId: Int, failed: Boolean = false) = {
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
if (failed) {
stageInfo.failureReason = Some("Failed!")
}
SparkListenerStageCompleted(stageInfo)
}

private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = {
SparkListenerJobStart(jobId, stageIds)
}

def createStageEndEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
SparkListenerStageCompleted(stageInfo)
private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded
SparkListenerJobEnd(jobId, result)
}

private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) {
val stageIds = jobId * 100 to jobId * 100 + 50
listener.onJobStart(createJobStartEvent(jobId, stageIds))
for (stageId <- stageIds) {
listener.onStageSubmitted(createStageStartEvent(stageId))
listener.onStageCompleted(createStageEndEvent(stageId, failed = stageId % 2 == 0))
}
listener.onJobEnd(createJobEndEvent(jobId, shouldFail))
}

private def assertActiveJobsStateIsEmpty(listener: JobProgressListener) {
listener.getSizesOfActiveStateTrackingCollections.foreach { case (fieldName, size) =>
assert(size === 0, s"$fieldName was not empty")
}
}

test("test LRU eviction of stages") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
val listener = new JobProgressListener(conf)

for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
}
assertActiveJobsStateIsEmpty(listener)

listener.completedStages.size should be (5)
listener.completedStages.count(_.stageId == 50) should be (1)
listener.completedStages.count(_.stageId == 49) should be (1)
listener.completedStages.count(_.stageId == 48) should be (1)
listener.completedStages.count(_.stageId == 47) should be (1)
listener.completedStages.count(_.stageId == 46) should be (1)
listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
}

test("test LRU eviction of jobs") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
conf.set("spark.ui.retainedJobs", 5.toString)
val listener = new JobProgressListener(conf)

// Run a bunch of jobs to get the listener into a state where we've exceeded both the
// job and stage retention limits:
for (jobId <- 1 to 10) {
runJob(listener, jobId, shouldFail = false)
}
for (jobId <- 200 to 210) {
runJob(listener, jobId, shouldFail = true)
}
assertActiveJobsStateIsEmpty(listener)
// Snapshot the sizes of various soft- and hard-size-limited collections:
val softLimitSizes = listener.getSizesOfSoftSizeLimitedCollections
val hardLimitSizes = listener.getSizesOfHardSizeLimitedCollections
// Run some more jobs:
for (jobId <- 11 to 50) {
runJob(listener, jobId, shouldFail = false)
// We shouldn't exceed the hard / soft limit sizes after the jobs have finished:
listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes)
listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes)
}

listener.completedJobs.size should be (5)
listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46))

for (jobId <- 51 to 100) {
runJob(listener, jobId, shouldFail = true)
// We shouldn't exceed the hard / soft limit sizes after the jobs have finished:
listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes)
listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes)
}
assertActiveJobsStateIsEmpty(listener)

// Completed and failed jobs each their own size limits, so this should still be the same:
listener.completedJobs.size should be (5)
listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46))
listener.failedJobs.size should be (5)
listener.failedJobs.map(_.jobId).toSet should be (Set(100, 99, 98, 97, 96))
}

test("test executor id to summary") {
Expand Down