Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -382,14 +382,18 @@ private[spark] object InternalAccumulator {
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
def create(): Seq[Accumulator[Long]] = {
Seq(
// Execution memory refers to the memory used by internal data structures created
// during shuffles, aggregations and joins. The value of this accumulator should be
// approximately the sum of the peak sizes across all such data structures created
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
) ++ maybeTestAccumulator.toSeq
def create(sc: SparkContext): Seq[Accumulator[Long]] = {
val internalAccumulators = Seq(
// Execution memory refers to the memory used by internal data structures created
// during shuffles, aggregations and joins. The value of this accumulator should be
// approximately the sum of the peak sizes across all such data structures created
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
) ++ maybeTestAccumulator.toSeq
internalAccumulators.foreach { accumulator =>
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
}
internalAccumulators
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[spark] abstract class Stage(
* accumulators here again will override partial values from the finished tasks.
*/
def resetInternalAccumulators(): Unit = {
_internalAccumulators = InternalAccumulator.create()
_internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
}

test("internal accumulators in TaskContext") {
val accums = InternalAccumulator.create()
sc = new SparkContext("local", "test")
val accums = InternalAccumulator.create(sc)
val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums)
val internalMetricsToAccums = taskContext.internalMetricsToAccumulators
val collectedInternalAccums = taskContext.collectInternalAccumulators()
Expand Down