Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add test
  • Loading branch information
Andrew Or committed May 11, 2016
commit 0d95847c53a6799fd81e667c70f112cde5dc4c02
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
ms
}

/**
* Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is
* stubbed to always throw [[RuntimeException]].
*/
protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = {
val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] {
override def answer(invocation: InvocationOnMock): Long = {
throw new RuntimeException("bad memory store!")
}
})
mm.setMemoryStore(ms)
ms
}

/**
* Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(evictedBlocks.nonEmpty)
}

test("SPARK-15260: atomically resize memory pools") {
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "0")
.set("spark.testing.memory", "1000")
val mm = UnifiedMemoryManager(conf, numCores = 2)
makeBadMemoryStore(mm)
val memoryMode = MemoryMode.ON_HEAP
// Acquire 1000 then release 600 bytes of storage memory, leaving the
// storage memory pool at 1000 bytes but only 400 bytes of which are used.
assert(mm.acquireStorageMemory(dummyBlock, 1000L, memoryMode))
mm.releaseStorageMemory(600L, memoryMode)
// Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of
// unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool
// by the same amount. If the eviction threw an exception, then we would shrink one pool
// without enlarging the other, resulting in an assertion failure.
intercept[RuntimeException] {
mm.acquireExecutionMemory(1000L, 0, memoryMode)
}
val assertInvariants = PrivateMethod[Unit]('assertInvariants)
mm.invokePrivate[Unit](assertInvariants())
}

}