Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
1e752f1
Added unpersist method to Broadcast.
Feb 5, 2014
80dd977
Fix for Broadcast unpersist patch.
Feb 6, 2014
e427a9e
Added ContextCleaner to automatically clean RDDs and shuffles when th…
tdas Feb 14, 2014
8512612
Changed TimeStampedHashMap to use WrappedJavaHashMap.
tdas Feb 14, 2014
a24fefc
Merge remote-tracking branch 'apache/master' into state-cleanup
tdas Mar 11, 2014
cb0a5a6
Fixed docs and styles.
tdas Mar 11, 2014
ae9da88
Removed unncessary TimeStampedHashMap from DAGScheduler, added try-ca…
tdas Mar 12, 2014
e61daa0
Modifications based on the comments on PR 126.
tdas Mar 13, 2014
a7260d3
Added try-catch in context cleaner and null value cleaning in TimeSta…
tdas Mar 17, 2014
892b952
Removed use of BoundedHashMap, and made BlockManagerSlaveActor cleanu…
tdas Mar 18, 2014
e1fba5f
Style fix
tdas Mar 19, 2014
f2881fd
Changed ContextCleaner to use ReferenceQueue instead of finalizer
tdas Mar 25, 2014
620eca3
Changes based on PR comments.
tdas Mar 25, 2014
a007307
Merge remote-tracking branch 'apache/master' into state-cleanup
tdas Mar 25, 2014
d2f8b97
Removed duplicate unpersistRDD.
tdas Mar 25, 2014
6c9dcf6
Added missing Apache license
tdas Mar 25, 2014
c7ccef1
Merge branch 'bc-unpersist-merge' of github.com:ignatich/incubator-sp…
andrewor14 Mar 26, 2014
ba52e00
Refactor broadcast classes
andrewor14 Mar 26, 2014
d0edef3
Add framework for broadcast cleanup
andrewor14 Mar 26, 2014
544ac86
Clean up broadcast blocks through BlockManager*
andrewor14 Mar 26, 2014
e95479c
Add tests for unpersisting broadcast
andrewor14 Mar 27, 2014
f201a8d
Test broadcast cleanup in ContextCleanerSuite + remove BoundedHashMap
andrewor14 Mar 27, 2014
c92e4d9
Merge github.com:apache/spark into cleanup
andrewor14 Mar 27, 2014
0d17060
Import, comments, and style fixes (minor)
andrewor14 Mar 28, 2014
34f436f
Generalize BroadcastBlockId to remove BroadcastHelperBlockId
andrewor14 Mar 28, 2014
fbfeec8
Add functionality to query executors for their local BlockStatuses
andrewor14 Mar 29, 2014
88904a3
Make TimeStampedWeakValueHashMap a wrapper of TimeStampedHashMap
andrewor14 Mar 29, 2014
e442246
Merge github.com:apache/spark into cleanup
andrewor14 Mar 29, 2014
8557c12
Merge github.com:apache/spark into cleanup
andrewor14 Mar 30, 2014
7edbc98
Merge remote-tracking branch 'apache-github/master' into state-cleanup
tdas Mar 31, 2014
634a097
Merge branch 'state-cleanup' of github.com:tdas/spark into cleanup
andrewor14 Mar 31, 2014
7ed72fb
Fix style test fail + remove verbose test message regarding broadcast
andrewor14 Mar 31, 2014
5016375
Address TD's comments
andrewor14 Apr 1, 2014
f0aabb1
Correct semantics for TimeStampedWeakValueHashMap + add tests
andrewor14 Apr 2, 2014
762a4d8
Merge pull request #1 from andrewor14/cleanup
tdas Apr 2, 2014
a6460d4
Merge github.com:apache/spark into cleanup
andrewor14 Apr 4, 2014
c5b1d98
Address Patrick's comments
andrewor14 Apr 4, 2014
a2cc8bc
Merge remote-tracking branch 'apache/master' into state-cleanup
tdas Apr 4, 2014
ada45f0
Merge branch 'state-cleanup' of github.com:tdas/spark into cleanup
andrewor14 Apr 4, 2014
cd72d19
Make automatic cleanup configurable (not documented)
andrewor14 Apr 4, 2014
b27f8e8
Merge pull request #3 from andrewor14/cleanup
tdas Apr 4, 2014
a430f06
Fixed compilation errors.
tdas Apr 4, 2014
104a89a
Fixed failing BroadcastSuite unit tests by introducing blocking for r…
tdas Apr 4, 2014
6222697
Fixed bug and adding unit test for removeBroadcast in BlockManagerSuite.
tdas Apr 4, 2014
41c9ece
Added more unit tests for BlockManager, DiskBlockManager, and Context…
tdas Apr 7, 2014
2b95b5e
Added more documentation on Broadcast implementations, specially whic…
tdas Apr 7, 2014
4d05314
Scala style fix.
tdas Apr 7, 2014
cff023c
Fixed issues based on Andrew's comments.
tdas Apr 7, 2014
d25a86e
Fixed stupid typo.
tdas Apr 7, 2014
f489fdc
Merge remote-tracking branch 'apache/master' into state-cleanup
tdas Apr 8, 2014
61b8d6e
Fixed issue with Tachyon + new BlockManager methods.
tdas Apr 8, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add functionality to query executors for their local BlockStatuses
Not all blocks are reported to the master. In HttpBroadcast and
TorrentBroadcast, for instance, most blocks are not reported to master.
The lack of a mechanism to get local block statuses on each executor
makes it difficult to test the correctness of un/persisting a broadcast.

This new functionality, though only used for testing at the moment, is
general enough to be used for other things in the future.
  • Loading branch information
andrewor14 committed Mar 29, 2014
commit fbfeec80cfb7a1bd86847fa22f641d9b9ad7480f
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.network

import java.net._
import java.nio._
import java.nio.channels._
import java.nio.channels.spi._
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,5 @@ private object BlockInfo {
private val BLOCK_PENDING: Long = -1L
private val BLOCK_FAILED: Long = -2L
}

private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,14 @@ private[spark] class BlockManager(
}
}

/**
* Get storage level of local block. If no info exists for the block, return None.
*/
def getLevel(blockId: BlockId): Option[StorageLevel] = blockInfo.get(blockId).map(_.level)
/** Return the status of the block identified by the given ID, if it exists. */
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfo.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
BlockStatus(info.level, memSize, diskSize)
}
}

/**
* Tell the master about the current storage status of a block. This will send a block update
Expand Down Expand Up @@ -631,10 +635,9 @@ private[spark] class BlockManager(
diskStore.putValues(blockId, iterator, level, askForBytes)
case ArrayBufferValues(array) =>
diskStore.putValues(blockId, array, level, askForBytes)
case ByteBufferValues(bytes) => {
case ByteBufferValues(bytes) =>
bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}
}
size = res.size
res.data match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,30 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
}

/**
* Mainly for testing. Ask the driver to query all executors for their storage levels
* regarding this block. This provides an avenue for the driver to learn the storage
* levels of blocks it has not been informed of.
* Return the block's local status on all block managers, if any.
*
* WARNING: This could lead to deadlocks if there are any outstanding messages the
* executors are already expecting from the driver. In this case, while the driver is
* waiting for the executors to respond to its GetStorageLevel query, the executors
* are also waiting for a response from the driver to a prior message.
* If askSlaves is true, this invokes the master to query each block manager for the most
* updated block statuses. This is useful when the master is not informed of the given block
* by all block managers.
*
* The interim solution is to wait for a brief window of time to pass before asking.
* This should suffice, since this mechanism is largely introduced for testing only.
* To avoid potential deadlocks, the use of Futures is necessary, because the master actor
* should not block on waiting for a block manager, which can in turn be waiting for the
* master actor for a response to a prior message.
*/
def askForStorageLevels(blockId: BlockId, waitTimeMs: Long = 1000) = {
Thread.sleep(waitTimeMs)
askDriverWithReply[Map[BlockManagerId, StorageLevel]](AskForStorageLevels(blockId))
def getBlockStatus(
blockId: BlockId,
askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a pretty expensive operation - what if there are hundreds of BlockManagers. It might make sense to say in the doc that this should only be used for testing. Otherwise people will come along and use it without understanding the performance implications.

Another thought here (let's talk offline) we should make it explicit which blocks the BlockManagerMaster is always informed about vs which ones it might not know about. Right now it's not made explicit anywhere and it's hard to reason about.

val msg = GetBlockStatus(blockId, askSlaves)
val response = askDriverWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
val (blockManagerIds, futures) = response.unzip
val result = Await.result(Future.sequence(futures), timeout)
if (result == null) {
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
}
val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]]
blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
status.map { s => (blockManagerId, s) }
}.toMap
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is introduced for testing broadcast cleanup (but can be used for other things in the future).

/** Stop the driver actor, called only on the Spark driver node */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.{HashMap => JHashMap}

import scala.collection.mutable
import scala.collection.JavaConversions._
import scala.concurrent.{Await, Future}
import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.{Actor, ActorRef, Cancellable}
Expand Down Expand Up @@ -93,6 +93,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetStorageStatus =>
sender ! storageStatus

case GetBlockStatus(blockId, askSlaves) =>
sender ! blockStatus(blockId, askSlaves)

case RemoveRdd(rddId) =>
sender ! removeRdd(rddId)

Expand Down Expand Up @@ -126,9 +129,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case HeartBeat(blockManagerId) =>
sender ! heartBeat(blockManagerId)

case AskForStorageLevels(blockId) =>
sender ! askForStorageLevels(blockId)

case other =>
logWarning("Got unknown message: " + other)
}
Expand Down Expand Up @@ -254,16 +254,30 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}.toArray
}

// For testing. Ask all block managers for the given block's local storage level, if any.
private def askForStorageLevels(blockId: BlockId): Map[BlockManagerId, StorageLevel] = {
val getStorageLevel = GetStorageLevel(blockId)
blockManagerInfo.values.flatMap { info =>
val future = info.slaveActor.ask(getStorageLevel)(akkaTimeout)
val result = Await.result(future, akkaTimeout)
if (result != null) {
// If the block does not exist on the slave, the slave replies None
result.asInstanceOf[Option[StorageLevel]].map { reply => (info.blockManagerId, reply) }
} else None
/**
* Return the block's local status for all block managers, if any.
*
* If askSlaves is true, the master queries each block manager for the most updated block
* statuses. This is useful when the master is not informed of the given block by all block
* managers.
*
* Rather than blocking on the block status query, master actor should simply return a
* Future to avoid potential deadlocks. This can arise if there exists a block manager
* that is also waiting for this master actor's response to a previous message.
*/
private def blockStatus(
blockId: BlockId,
askSlaves: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = {
import context.dispatcher
val getBlockStatus = GetBlockStatus(blockId)
blockManagerInfo.values.map { info =>
val blockStatusFuture =
if (askSlaves) {
info.slaveActor.ask(getBlockStatus)(akkaTimeout).mapTo[Option[BlockStatus]]
} else {
Future { info.getStatus(blockId) }
}
(info.blockManagerId, blockStatusFuture)
}.toMap
}

Expand Down Expand Up @@ -352,9 +366,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
}


private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)

private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
Expand All @@ -371,6 +382,8 @@ private[spark] class BlockManagerInfo(
logInfo("Registering block manager %s with %s RAM".format(
blockManagerId.hostPort, Utils.bytesToString(maxMem)))

def getStatus(blockId: BlockId) = Option(_blocks.get(blockId))

def updateLastSeenMs() {
_lastSeenMs = System.currentTimeMillis()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ private[storage] object BlockManagerMessages {
case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
extends ToBlockManagerSlave

// For testing. Ask the slave for the block's storage level.
case class GetStorageLevel(blockId: BlockId) extends ToBlockManagerSlave


//////////////////////////////////////////////////////////////////////////////////
// Messages from slaves to the master.
Expand Down Expand Up @@ -113,10 +110,10 @@ private[storage] object BlockManagerMessages {

case object GetMemoryStatus extends ToBlockManagerMaster

case object ExpireDeadHosts extends ToBlockManagerMaster

case object GetStorageStatus extends ToBlockManagerMaster

// For testing. Have the master ask all slaves for the given block's storage level.
case class AskForStorageLevels(blockId: BlockId) extends ToBlockManagerMaster
case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
extends ToBlockManagerMaster

case object ExpireDeadHosts extends ToBlockManagerMaster
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BlockManagerSlaveActor(
case RemoveBroadcast(broadcastId, removeFromDriver) =>
blockManager.removeBroadcast(broadcastId, removeFromDriver)

case GetStorageLevel(blockId) =>
sender ! blockManager.getLevel(blockId)
case GetBlockStatus(blockId, _) =>
sender ! blockManager.getStatus(blockId)
}
}
69 changes: 40 additions & 29 deletions core/src/test/scala/org/apache/spark/BroadcastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,32 +107,40 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
// Verify that the broadcast file is created, and blocks are persisted only on the driver
def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
assert(blockIds.size === 1)
val levels = bmm.askForStorageLevels(blockIds.head, waitTimeMs = 0)
assert(levels.size === 1)
levels.head match { case (bm, level) =>
assert(bm.executorId === "<driver>")
assert(level === StorageLevel.MEMORY_AND_DISK)
val statuses = bmm.getBlockStatus(blockIds.head)
assert(statuses.size === 1)
statuses.head match { case (bm, status) =>
assert(bm.executorId === "<driver>", "Block should only be on the driver")
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store on the driver")
assert(status.diskSize === 0, "Block should not be in disk store on the driver")
}
assert(HttpBroadcast.getFile(blockIds.head.broadcastId).exists)
assert(HttpBroadcast.getFile(blockIds.head.broadcastId).exists, "Broadcast file not found!")
}

// Verify that blocks are persisted in both the executors and the driver
def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
assert(blockIds.size === 1)
val levels = bmm.askForStorageLevels(blockIds.head, waitTimeMs = 0)
assert(levels.size === numSlaves + 1)
levels.foreach { case (_, level) =>
assert(level === StorageLevel.MEMORY_AND_DISK)
val statuses = bmm.getBlockStatus(blockIds.head)
assert(statuses.size === numSlaves + 1)
statuses.foreach { case (_, status) =>
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store")
assert(status.diskSize === 0, "Block should not be in disk store")
}
}

// Verify that blocks are unpersisted on all executors, and on all nodes if removeFromDriver
// is true. In the latter case, also verify that the broadcast file is deleted on the driver.
def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
assert(blockIds.size === 1)
val levels = bmm.askForStorageLevels(blockIds.head, waitTimeMs = 0)
assert(levels.size === (if (removeFromDriver) 0 else 1))
assert(removeFromDriver === !HttpBroadcast.getFile(blockIds.head.broadcastId).exists)
val statuses = bmm.getBlockStatus(blockIds.head)
val expectedNumBlocks = if (removeFromDriver) 0 else 1
val possiblyNot = if (removeFromDriver) "" else " not"
assert(statuses.size === expectedNumBlocks,
"Block should%s be unpersisted on the driver".format(possiblyNot))
assert(removeFromDriver === !HttpBroadcast.getFile(blockIds.head.broadcastId).exists,
"Broadcast file should%s be deleted".format(possiblyNot))
}

testUnpersistBroadcast(numSlaves, httpConf, getBlockIds, afterCreation,
Expand All @@ -158,28 +166,32 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
// Verify that blocks are persisted only on the driver
def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
blockIds.foreach { blockId =>
val levels = bmm.askForStorageLevels(blockId, waitTimeMs = 0)
assert(levels.size === 1)
levels.head match { case (bm, level) =>
assert(bm.executorId === "<driver>")
assert(level === StorageLevel.MEMORY_AND_DISK)
val statuses = bmm.getBlockStatus(blockIds.head)
assert(statuses.size === 1)
statuses.head match { case (bm, status) =>
assert(bm.executorId === "<driver>", "Block should only be on the driver")
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store on the driver")
assert(status.diskSize === 0, "Block should not be in disk store on the driver")
}
}
}

// Verify that blocks are persisted in both the executors and the driver
def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
blockIds.foreach { blockId =>
val levels = bmm.askForStorageLevels(blockId, waitTimeMs = 0)
val statuses = bmm.getBlockStatus(blockId)
if (blockId.field == "meta") {
// Meta data is only on the driver
assert(levels.size === 1)
levels.head match { case (bm, _) => assert(bm.executorId === "<driver>") }
assert(statuses.size === 1)
statuses.head match { case (bm, _) => assert(bm.executorId === "<driver>") }
} else {
// Other blocks are on both the executors and the driver
assert(levels.size === numSlaves + 1)
levels.foreach { case (_, level) =>
assert(level === StorageLevel.MEMORY_AND_DISK)
assert(statuses.size === numSlaves + 1)
statuses.foreach { case (_, status) =>
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store")
assert(status.diskSize === 0, "Block should not be in disk store")
}
}
}
Expand All @@ -189,12 +201,11 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
// is true.
def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
val expectedNumBlocks = if (removeFromDriver) 0 else 1
var waitTimeMs = 1000L
val possiblyNot = if (removeFromDriver) "" else " not"
blockIds.foreach { blockId =>
// Allow a second for the messages triggered by unpersist to propagate to prevent deadlocks
val levels = bmm.askForStorageLevels(blockId, waitTimeMs)
assert(levels.size === expectedNumBlocks)
waitTimeMs = 0L
val statuses = bmm.getBlockStatus(blockId)
assert(statuses.size === expectedNumBlocks,
"Block should%s be unpersisted on the driver".format(possiblyNot))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class CleanerTester(
"One or more shuffles' blocks cannot be found in disk manager, cannot start cleaner test")

// Verify that the broadcast is in the driver's block manager
assert(broadcastIds.forall(bid => blockManager.getLevel(broadcastBlockId(bid)).isDefined),
assert(broadcastIds.forall(bid => blockManager.getStatus(broadcastBlockId(bid)).isDefined),
"One ore more broadcasts have not been persisted in the driver's block manager")
}

Expand All @@ -291,7 +291,7 @@ class CleanerTester(

// Verify all broadcasts have been unpersisted
assert(broadcastIds.forall { bid =>
blockManager.master.askForStorageLevels(broadcastBlockId(bid)).isEmpty
blockManager.master.getBlockStatus(broadcastBlockId(bid)).isEmpty
})

return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,46 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(!store.get("list5").isDefined, "list5 was in store")
}

test("query block statuses") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)
val list = List.fill(2)(new Array[Byte](200))

// Tell master. By LRU, only list2 and list3 remains.
store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)

// getLocations and getBlockStatus should yield the same locations
assert(store.master.getLocations("list1").size === 0)
assert(store.master.getLocations("list2").size === 1)
assert(store.master.getLocations("list3").size === 1)
assert(store.master.getBlockStatus("list1", askSlaves = false).size === 0)
assert(store.master.getBlockStatus("list2", askSlaves = false).size === 1)
assert(store.master.getBlockStatus("list3", askSlaves = false).size === 1)
assert(store.master.getBlockStatus("list1", askSlaves = true).size === 0)
assert(store.master.getBlockStatus("list2", askSlaves = true).size === 1)
assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)

// This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
store.put("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
store.put("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)

// getLocations should return nothing because the master is not informed
// getBlockStatus without asking slaves should have the same result
// getBlockStatus with asking slaves, however, should present the actual block statuses
assert(store.master.getLocations("list4").size === 0)
assert(store.master.getLocations("list5").size === 0)
assert(store.master.getLocations("list6").size === 0)
assert(store.master.getBlockStatus("list4", askSlaves = false).size === 0)
assert(store.master.getBlockStatus("list5", askSlaves = false).size === 0)
assert(store.master.getBlockStatus("list6", askSlaves = false).size === 0)
assert(store.master.getBlockStatus("list4", askSlaves = true).size === 0)
assert(store.master.getBlockStatus("list5", askSlaves = true).size === 1)
assert(store.master.getBlockStatus("list6", askSlaves = true).size === 1)
}

test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)
Expand Down