Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-7046] Remove InputMetrics from BlockResult
The BlockResult class originally contained an InputMetrics object so that InputMetrics could
directly be used as the InputMetrics for the whole task. Now we copy the fields out of here, and
the presence of this object is confusing because it's only a partial input metrics (it doesn't
include the records read). Because this object is no longer useful (and is confusing), it should
be removed.
  • Loading branch information
kayousterhout committed Apr 22, 2015
commit a08ca1982b1e56f30a1e177b9f6f173aff5b8d4f
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.incBytesRead(inputMetrics.bytesRead)
.getInputMetricsForReadMethod(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,8 @@ private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.incBytesRead(bytes)
}
val readMethod: DataReadMethod.Value,
val bytes: Long)

/**
* Manager running on every node (driver and executors) which provides interfaces for putting and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,19 +428,19 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
val list1Get = store.get("list1")
assert(list1Get.isDefined, "list1 expected to be in store")
assert(list1Get.get.data.size === 2)
assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
assert(list1Get.get.bytes === list1SizeEstimate)
assert(list1Get.get.readMethod === DataReadMethod.Memory)
val list2MemoryGet = store.get("list2memory")
assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
assert(list2MemoryGet.get.data.size === 3)
assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
assert(list2MemoryGet.get.bytes === list2SizeEstimate)
assert(list2MemoryGet.get.readMethod === DataReadMethod.Memory)
val list2DiskGet = store.get("list2disk")
assert(list2DiskGet.isDefined, "list2memory expected to be in store")
assert(list2DiskGet.get.data.size === 3)
// We don't know the exact size of the data on disk, but it should certainly be > 0.
assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
assert(list2DiskGet.get.bytes > 0)
assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
}

test("in-memory LRU storage") {
Expand Down