Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ public BypassMergeSortShuffleWriter(
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = new ShuffleWriteMetrics();
taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.serializer = Serializer.getSerializer(dep.serializer());
this.shuffleBlockResolver = shuffleBlockResolver;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public UnsafeShuffleWriter(
this.shuffleId = dep.shuffleId();
this.serializer = Serializer.getSerializer(dep.serializer()).newInstance();
this.partitioner = dep.partitioner();
this.writeMetrics = new ShuffleWriteMetrics();
taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.taskContext = taskContext;
this.sparkConf = sparkConf;
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ private UnsafeExternalSorter(
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
// TODO: metrics tracking + integration with shuffle write metrics
// need to connect the write metrics to task metrics so we count the spill IO somewhere.
this.writeMetrics = new ShuffleWriteMetrics();
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();

if (existingInMemorySorter == null) {
this.inMemSorter = new UnsafeInMemorySorter(
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(blockResult.readMethod)
val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
Expand All @@ -66,11 +65,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// Otherwise, cache the values
val cachedValues = putInBlockManager(key, computedValues, storageLevel)
new InterruptibleIterator(context, cachedValues)

} finally {
loading.synchronized {
loading.remove(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ private[spark] class Executor(
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.foreach { metrics =>
metrics.updateShuffleReadMetrics()
metrics.mergeShuffleReadMetrics()
metrics.updateInputMetrics()
metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
metrics.updateAccumulators()
Expand Down
180 changes: 120 additions & 60 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,37 @@ class TaskMetrics extends Serializable {
private[spark] def incDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled += value
private[spark] def decDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled -= value

/**
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
* are stored here.
*/
private var _inputMetrics: Option[InputMetrics] = None

/**
* Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted
* data, defined only in tasks with input.
*/
def inputMetrics: Option[InputMetrics] = _inputMetrics

/**
* Get or create a new [[InputMetrics]] associated with this task.
*/
private[spark] def registerInputMetrics(readMethod: DataReadMethod.Value): InputMetrics = {
synchronized {
val metrics = _inputMetrics.getOrElse {
val metrics = new InputMetrics(readMethod)
_inputMetrics = Some(metrics)
metrics
}
// If there already exists an InputMetric with the same read method, we can just return
// that one. Otherwise, if the read method is different from the one previously seen by
// this task, we return a new dummy one to avoid clobbering the values of the old metrics.
// In the future we should try to store input metrics from all different read methods at
// the same time (SPARK-5225).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's unfortunate. Since it's a pre-existing problem, it's fine to not fix it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, I guess we'd unconditionally overwrite so we might handle it wrong even for the same read method case? This looks good to me, just curious RE: whether this fixed another bug.

if (metrics.readMethod == readMethod) {
metrics
} else {
new InputMetrics(readMethod)
}
}
}

/**
* This should only be used when recreating TaskMetrics, not when updating input metrics in
* executors
Expand All @@ -118,18 +141,37 @@ class TaskMetrics extends Serializable {
_inputMetrics = inputMetrics
}

private var _outputMetrics: Option[OutputMetrics] = None

/**
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
* data was written are stored here.
* Metrics related to writing data externally (e.g. to a distributed filesystem),
* defined only in tasks with output.
*/
var outputMetrics: Option[OutputMetrics] = None
def outputMetrics: Option[OutputMetrics] = _outputMetrics

@deprecated("setting OutputMetrics is for internal use only", "2.0.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know whether there's any third-party code that calls this? Wondering if we can just drop it. Also, AFAIK this would only be for source-compatibility: code which directly set outputMetrics will be binary-incompatible with this.

def outputMetrics_=(om: Option[OutputMetrics]): Unit = {
_outputMetrics = om
}

/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
* This includes read metrics aggregated over all the task's shuffle dependencies.
* Get or create a new [[OutputMetrics]] associated with this task.
*/
private[spark] def registerOutputMetrics(
writeMethod: DataWriteMethod.Value): OutputMetrics = synchronized {
_outputMetrics.getOrElse {
val metrics = new OutputMetrics(writeMethod)
_outputMetrics = Some(metrics)
metrics
}
}

private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None

/**
* Metrics related to shuffle read aggregated across all shuffle dependencies.
* This is defined only if there are shuffle dependencies in this task.
*/
def shuffleReadMetrics: Option[ShuffleReadMetrics] = _shuffleReadMetrics

/**
Expand All @@ -141,66 +183,35 @@ class TaskMetrics extends Serializable {
}

/**
* ShuffleReadMetrics per dependency for collecting independently while task is in progress.
*/
@transient private lazy val depsShuffleReadMetrics: ArrayBuffer[ShuffleReadMetrics] =
new ArrayBuffer[ShuffleReadMetrics]()

/**
* If this task writes to shuffle output, metrics on the written shuffle data will be collected
* here
*/
var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None

/**
* Storage statuses of any blocks that have been updated as a result of this task.
*/
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None

/**
* Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency.
*
* A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
* issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each
* dependency, and merge these metrics before reporting them to the driver. This method returns
* a ShuffleReadMetrics for a dependency and registers it for merging later.
*/
private [spark] def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized {
val readMetrics = new ShuffleReadMetrics()
depsShuffleReadMetrics += readMetrics
readMetrics
}
* issues from readers in different threads, in-progress tasks use a [[ShuffleReadMetrics]] for
* each dependency and merge these metrics before reporting them to the driver.
*/
@transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics]

/**
* Returns the input metrics object that the task should use. Currently, if
* there exists an input metric with the same readMethod, we return that one
* so the caller can accumulate bytes read. If the readMethod is different
* than previously seen by this task, we return a new InputMetric but don't
* record it.
* Create a temporary [[ShuffleReadMetrics]] for a particular shuffle dependency.
*
* Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
* we can store all the different inputMetrics (one per readMethod).
* All usages are expected to be followed by a call to [[mergeShuffleReadMetrics]], which
* merges the temporary values synchronously. Otherwise, all temporary data collected will
* be lost.
*/
private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod): InputMetrics = {
synchronized {
_inputMetrics match {
case None =>
val metrics = new InputMetrics(readMethod)
_inputMetrics = Some(metrics)
metrics
case Some(metrics @ InputMetrics(method)) if method == readMethod =>
metrics
case Some(InputMetrics(method)) =>
new InputMetrics(readMethod)
}
}
private[spark] def registerTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized {
val readMetrics = new ShuffleReadMetrics
tempShuffleReadMetrics += readMetrics
readMetrics
}

/**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
* Merge values across all temporary [[ShuffleReadMetrics]] into `_shuffleReadMetrics`.
* This is expected to be called on executor heartbeat and at the end of a task.
*/
private[spark] def updateShuffleReadMetrics(): Unit = synchronized {
if (!depsShuffleReadMetrics.isEmpty) {
val merged = new ShuffleReadMetrics()
for (depMetrics <- depsShuffleReadMetrics) {
private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
if (tempShuffleReadMetrics.nonEmpty) {
val merged = new ShuffleReadMetrics
for (depMetrics <- tempShuffleReadMetrics) {
merged.incFetchWaitTime(depMetrics.fetchWaitTime)
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
Expand All @@ -212,6 +223,55 @@ class TaskMetrics extends Serializable {
}
}

private var _shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None

/**
* Metrics related to shuffle write, defined only in shuffle map stages.
*/
def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics

@deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0")
def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = {
_shuffleWriteMetrics = swm
}

/**
* Get or create a new [[ShuffleWriteMetrics]] associated with this task.
*/
private[spark] def registerShuffleWriteMetrics(): ShuffleWriteMetrics = synchronized {
_shuffleWriteMetrics.getOrElse {
val metrics = new ShuffleWriteMetrics
_shuffleWriteMetrics = Some(metrics)
metrics
}
}

private var _updatedBlockStatuses: Seq[(BlockId, BlockStatus)] =
Seq.empty[(BlockId, BlockStatus)]

/**
* Storage statuses of any blocks that have been updated as a result of this task.
*/
def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses

@deprecated("setting updated blocks is for internal use only", "2.0.0")
def updatedBlocks_=(ub: Option[Seq[(BlockId, BlockStatus)]]): Unit = {
_updatedBlockStatuses = ub.getOrElse(Seq.empty[(BlockId, BlockStatus)])
}

private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = {
_updatedBlockStatuses ++= v
}

private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = {
_updatedBlockStatuses = v
}

@deprecated("use updatedBlockStatuses instead", "2.0.0")
def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = {
if (_updatedBlockStatuses.nonEmpty) Some(_updatedBlockStatuses) else None
}

private[spark] def updateInputMetrics(): Unit = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()

val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)

// Sets the thread local variable for the file's name
split.inputSplit.value match {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = getConf

val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
Expand Down
Loading