From be72e81a6726289f8dd9ba99271ff0ba47f67cdb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 19 Nov 2014 14:41:58 -0800 Subject: [PATCH 1/2] [SPARK-4495] Fix memory leaks in JobProgressListener MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit fixes a memory leak in JobProgressListener that I introduced in SPARK-2321 and adds a testing framework to ensure that it’s very difficult to inadvertently introduce new memory leaks. --- .../spark/ui/jobs/JobProgressListener.scala | 113 +++++++++++++----- .../ui/jobs/JobProgressListenerSuite.scala | 100 +++++++++++++--- 2 files changed, 170 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 8bbde51e1801..0bf96245b3f6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -40,41 +40,108 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { import JobProgressListener._ + // Define a handful of type aliases so that data structures' types can serve as documentation. + // These type aliases are public because they're used in the types of public fields: + type JobId = Int type StageId = Int type StageAttemptId = Int + type PoolName = String + type ExecutorId = String - // How many stages to remember - val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) - // How many jobs to remember - val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) + // Define all of our state: + // Jobs: val activeJobs = new HashMap[JobId, JobUIData] val completedJobs = ListBuffer[JobUIData]() val failedJobs = ListBuffer[JobUIData]() val jobIdToData = new HashMap[JobId, JobUIData] + // Stages: val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData] val stageIdToInfo = new HashMap[StageId, StageInfo] - - // Number of completed and failed stages, may not actually equal to completedStages.size and - // failedStages.size respectively due to completedStage and failedStages only maintain the latest - // part of the stages, the earlier ones will be removed when there are too many stages for - // memory sake. + val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]() + // Total of completed and failed stages that have ever been run. These may be greater than + // `completedStages.size` and `failedStages.size` if we have run more stages or jobs than + // JobProgressListener's retention limits. var numCompletedStages = 0 var numFailedStages = 0 - // Map from pool name to a hash map (map from stage id to StageInfo). - val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() - - val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() + // Misc: + val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]() + def blockManagerIds = executorIdToBlockManagerId.values.toSeq var schedulingMode: Option[SchedulingMode] = None - def blockManagerIds = executorIdToBlockManagerId.values.toSeq + // To limit the total memory usage of JobProgressListener, we only track information for a fixed + // number of non-active jobs and stages (there is no limit for active jobs and stages): + + val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) + val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) + + // We can test for memory leaks by ensuring that data structures that track non-active jobs and + // stages do not grow without bound and that structures for active jobs/stages eventually become + // empty once Spark is idle. Let's partition our data structures into ones that should be empty + // once Spark is idle and ones that should have a bounded size. These methods are used by unit + // tests, but they're defined here so that people don't forget to update the tests when adding + // new fields. Some data structures have multiple levels of nesting, etc, so this lets us + // customize our notion of "size" for each structure: + + // These sizes should all be 0 once Spark is idle (no active stages / jobs): + private[spark] def getSizesOfActiveJobDataStructures: Map[String, Int] = { + Map( + "activeStages" -> activeStages.size, + "activeJobs" -> activeJobs.size, + "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum + ) + } + + // These sizes should stop growing once we have run at least `spark.ui.retainedStages` stages + // and `spark.ui.retainedJobs` jobs: + private[spark] def getSizesOfHardSizeLimitedDataStructures: Map[String, Int] = { + Map( + "completedJobs" -> completedJobs.size, + "failedJobs" -> failedJobs.size, + "completedStages" -> completedStages.size, + "failedStages" -> failedStages.size + ) + } + + // These sizes may grow arbitrarily, but once Spark becomes idle they should shrink back to + // some bound based on the `spark.ui.retainedStages` and `spark.ui.retainedJobs` settings: + private[spark] def getSizesOfSoftSizeLimitedDataStructures: Map[String, Int] = { + Map( + "jobIdToData" -> jobIdToData.size, + "stageIdToData" -> stageIdToData.size, + "stageIdToStageInfo" -> stageIdToInfo.size + ) + } + + /** If stages is too large, remove and garbage collect old stages */ + private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { + if (stages.size > retainedStages) { + val toRemove = math.max(retainedStages / 10, 1) + stages.take(toRemove).foreach { s => + stageIdToData.remove((s.stageId, s.attemptId)) + stageIdToInfo.remove(s.stageId) + } + stages.trimStart(toRemove) + } + } + + /** If jobs is too large, remove and garbage collect old jobs */ + private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized { + if (jobs.size > retainedJobs) { + val toRemove = math.max(retainedJobs / 10, 1) + jobs.take(toRemove).foreach { job => + jobIdToData.remove(job.jobId) + } + jobs.trimStart(toRemove) + } + } override def onJobStart(jobStart: SparkListenerJobStart) = synchronized { val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) @@ -92,9 +159,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData + trimJobsIfNecessary(completedJobs) jobData.status = JobExecutionStatus.SUCCEEDED case JobFailed(exception) => failedJobs += jobData + trimJobsIfNecessary(failedJobs) jobData.status = JobExecutionStatus.FAILED } } @@ -118,23 +187,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { if (stage.failureReason.isEmpty) { completedStages += stage numCompletedStages += 1 - trimIfNecessary(completedStages) + trimStagesIfNecessary(completedStages) } else { failedStages += stage numFailedStages += 1 - trimIfNecessary(failedStages) - } - } - - /** If stages is too large, remove and garbage collect old stages */ - private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { - if (stages.size > retainedStages) { - val toRemove = math.max(retainedStages / 10, 1) - stages.take(toRemove).foreach { s => - stageIdToData.remove((s.stageId, s.attemptId)) - stageIdToInfo.remove(s.stageId) - } - stages.trimStart(toRemove) + trimStagesIfNecessary(failedStages) } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 7c102cc7f404..d2060263769a 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -28,32 +28,102 @@ import org.apache.spark.util.Utils class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers { - test("test LRU eviction of stages") { - val conf = new SparkConf() - conf.set("spark.ui.retainedStages", 5.toString) - val listener = new JobProgressListener(conf) - def createStageStartEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") - SparkListenerStageSubmitted(stageInfo) + private def createStageStartEvent(stageId: Int) = { + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") + SparkListenerStageSubmitted(stageInfo) + } + + private def createStageEndEvent(stageId: Int, failed: Boolean = false) = { + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") + if (failed) { + stageInfo.failureReason = Some("Failed!") } + SparkListenerStageCompleted(stageInfo) + } + + private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = { + SparkListenerJobStart(jobId, stageIds) + } - def createStageEndEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") - SparkListenerStageCompleted(stageInfo) + private def createJobEndEvent(jobId: Int, failed: Boolean = false) = { + val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded + SparkListenerJobEnd(jobId, result) + } + + private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) { + val stageIds = jobId * 100 to jobId * 100 + 50 + listener.onJobStart(createJobStartEvent(jobId, stageIds)) + for (stageId <- stageIds) { + listener.onStageSubmitted(createStageStartEvent(stageId)) + listener.onStageCompleted(createStageEndEvent(stageId, failed = stageId % 2 == 0)) + } + listener.onJobEnd(createJobEndEvent(jobId, shouldFail)) + } + + private def assertActiveJobsStateIsEmpty(listener: JobProgressListener) { + listener.getSizesOfActiveJobDataStructures.foreach { case (fieldName, size) => + assert(size === 0, s"$fieldName was not empty") } + } + + test("test LRU eviction of stages") { + val conf = new SparkConf() + conf.set("spark.ui.retainedStages", 5.toString) + val listener = new JobProgressListener(conf) for (i <- 1 to 50) { listener.onStageSubmitted(createStageStartEvent(i)) listener.onStageCompleted(createStageEndEvent(i)) } + assertActiveJobsStateIsEmpty(listener) listener.completedStages.size should be (5) - listener.completedStages.count(_.stageId == 50) should be (1) - listener.completedStages.count(_.stageId == 49) should be (1) - listener.completedStages.count(_.stageId == 48) should be (1) - listener.completedStages.count(_.stageId == 47) should be (1) - listener.completedStages.count(_.stageId == 46) should be (1) + listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46)) + } + + test("test LRU eviction of jobs") { + val conf = new SparkConf() + conf.set("spark.ui.retainedStages", 5.toString) + conf.set("spark.ui.retainedJobs", 5.toString) + val listener = new JobProgressListener(conf) + + // Run a bunch of jobs to get the listener into a state where we've exceeded both the + // job and stage retention limits: + for (jobId <- 1 to 10) { + runJob(listener, jobId, shouldFail = false) + } + for (jobId <- 200 to 210) { + runJob(listener, jobId, shouldFail = true) + } + assertActiveJobsStateIsEmpty(listener) + // Snapshot the sizes of various soft- and hard-size-limited collections: + val softLimitSizes = listener.getSizesOfSoftSizeLimitedDataStructures + val hardLimitSizes = listener.getSizesOfHardSizeLimitedDataStructures + // Run some more jobs: + for (jobId <- 11 to 50) { + runJob(listener, jobId, shouldFail = false) + // We shouldn't exceed the hard / soft limit sizes after the jobs have finished: + listener.getSizesOfSoftSizeLimitedDataStructures should be (softLimitSizes) + listener.getSizesOfHardSizeLimitedDataStructures should be (hardLimitSizes) + } + + listener.completedJobs.size should be (5) + listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46)) + + for (jobId <- 51 to 100) { + runJob(listener, jobId, shouldFail = true) + // We shouldn't exceed the hard / soft limit sizes after the jobs have finished: + listener.getSizesOfSoftSizeLimitedDataStructures should be (softLimitSizes) + listener.getSizesOfHardSizeLimitedDataStructures should be (hardLimitSizes) + } + assertActiveJobsStateIsEmpty(listener) + + // Completed and failed jobs each their own size limits, so this should still be the same: + listener.completedJobs.size should be (5) + listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46)) + listener.failedJobs.size should be (5) + listener.failedJobs.map(_.jobId).toSet should be (Set(100, 99, 98, 97, 96)) } test("test executor id to summary") { From c73fab5b1419a3870f8b84407b3e29ab87f238a8 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 19 Nov 2014 14:50:37 -0800 Subject: [PATCH 2/2] "data structures" -> collections --- .../spark/ui/jobs/JobProgressListener.scala | 30 +++++++++---------- .../ui/jobs/JobProgressListenerSuite.scala | 14 ++++----- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 0bf96245b3f6..ccdcf0e047f4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -82,16 +82,16 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) - // We can test for memory leaks by ensuring that data structures that track non-active jobs and - // stages do not grow without bound and that structures for active jobs/stages eventually become - // empty once Spark is idle. Let's partition our data structures into ones that should be empty - // once Spark is idle and ones that should have a bounded size. These methods are used by unit - // tests, but they're defined here so that people don't forget to update the tests when adding - // new fields. Some data structures have multiple levels of nesting, etc, so this lets us - // customize our notion of "size" for each structure: - - // These sizes should all be 0 once Spark is idle (no active stages / jobs): - private[spark] def getSizesOfActiveJobDataStructures: Map[String, Int] = { + // We can test for memory leaks by ensuring that collections that track non-active jobs and + // stages do not grow without bound and that collections for active jobs/stages eventually become + // empty once Spark is idle. Let's partition our collections into ones that should be empty + // once Spark is idle and ones that should have a hard- or soft-limited sizes. + // These methods are used by unit tests, but they're defined here so that people don't forget to + // update the tests when adding new collections. Some collections have multiple levels of + // nesting, etc, so this lets us customize our notion of "size" for each structure: + + // These collections should all be empty once Spark is idle (no active stages / jobs): + private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, Int] = { Map( "activeStages" -> activeStages.size, "activeJobs" -> activeJobs.size, @@ -99,9 +99,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { ) } - // These sizes should stop growing once we have run at least `spark.ui.retainedStages` stages - // and `spark.ui.retainedJobs` jobs: - private[spark] def getSizesOfHardSizeLimitedDataStructures: Map[String, Int] = { + // These collections should stop growing once we have run at least `spark.ui.retainedStages` + // stages and `spark.ui.retainedJobs` jobs: + private[spark] def getSizesOfHardSizeLimitedCollections: Map[String, Int] = { Map( "completedJobs" -> completedJobs.size, "failedJobs" -> failedJobs.size, @@ -110,9 +110,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { ) } - // These sizes may grow arbitrarily, but once Spark becomes idle they should shrink back to + // These collections may grow arbitrarily, but once Spark becomes idle they should shrink back to // some bound based on the `spark.ui.retainedStages` and `spark.ui.retainedJobs` settings: - private[spark] def getSizesOfSoftSizeLimitedDataStructures: Map[String, Int] = { + private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, Int] = { Map( "jobIdToData" -> jobIdToData.size, "stageIdToData" -> stageIdToData.size, diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index d2060263769a..15c5b4e702ef 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -62,7 +62,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc } private def assertActiveJobsStateIsEmpty(listener: JobProgressListener) { - listener.getSizesOfActiveJobDataStructures.foreach { case (fieldName, size) => + listener.getSizesOfActiveStateTrackingCollections.foreach { case (fieldName, size) => assert(size === 0, s"$fieldName was not empty") } } @@ -98,14 +98,14 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc } assertActiveJobsStateIsEmpty(listener) // Snapshot the sizes of various soft- and hard-size-limited collections: - val softLimitSizes = listener.getSizesOfSoftSizeLimitedDataStructures - val hardLimitSizes = listener.getSizesOfHardSizeLimitedDataStructures + val softLimitSizes = listener.getSizesOfSoftSizeLimitedCollections + val hardLimitSizes = listener.getSizesOfHardSizeLimitedCollections // Run some more jobs: for (jobId <- 11 to 50) { runJob(listener, jobId, shouldFail = false) // We shouldn't exceed the hard / soft limit sizes after the jobs have finished: - listener.getSizesOfSoftSizeLimitedDataStructures should be (softLimitSizes) - listener.getSizesOfHardSizeLimitedDataStructures should be (hardLimitSizes) + listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes) + listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes) } listener.completedJobs.size should be (5) @@ -114,8 +114,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc for (jobId <- 51 to 100) { runJob(listener, jobId, shouldFail = true) // We shouldn't exceed the hard / soft limit sizes after the jobs have finished: - listener.getSizesOfSoftSizeLimitedDataStructures should be (softLimitSizes) - listener.getSizesOfHardSizeLimitedDataStructures should be (hardLimitSizes) + listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes) + listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes) } assertActiveJobsStateIsEmpty(listener)