Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rollback
  • Loading branch information
jianran.tfh committed Mar 16, 2017
commit a740052594352f45267812fc65a04082dcf3913c
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
// Same-RDD rule: Put in more partitions from RDD 0; they should replace rdd_1_1
// Do a get() on rdd_0_2 so that it is the most recently used item
assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
// Put in more partitions from RDD 0; they should replace rdd_1_1
store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
Expand All @@ -607,20 +609,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store")
}

test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
store = makeBlockManager(12000)
store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Access rdd_1_0 to ensure it's not least recently used.
assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
// According to the same-RDD rule, rdd_1_0 should be replaced here.
store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// rdd_1_0 should have been replaced, even it's not least recently used.
assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
}

test("on-disk storage") {
store = makeBlockManager(1200)
val a1 = new Array[Byte](400)
Expand Down Expand Up @@ -1076,7 +1064,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(matchedBlockIds.toSet === Set(RDDBlockId(1, 0), RDDBlockId(1, 1)))
}


test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
store = makeBlockManager(12000)
store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Access rdd_1_0 to ensure it's not least recently used.
assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
// According to the same-RDD rule, rdd_1_0 should be replaced here.
store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// rdd_1_0 should have been replaced, even it's not least recently used.
assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
}

test("safely unroll blocks through putIterator (disk)") {
store = makeBlockManager(12000)
Expand Down