Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update tests for branch-1.6
  • Loading branch information
Andrew Or committed May 11, 2016
commit 952d95c46478d803ec489add952c4ab120b9007f
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
storageRegionSize,
maxMemory - storageRegionSize) {

assertInvariant()

// We always maintain this invariant:
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
private def assertInvariant(): Unit = {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
}

override def maxStorageMemory: Long = synchronized {
maxMemory - onHeapExecutionMemoryPool.memoryUsed
Expand All @@ -77,7 +81,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assertInvariant()
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
Expand Down Expand Up @@ -137,7 +141,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assertInvariant()
assert(numBytes >= 0)
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,17 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val memoryMode = MemoryMode.ON_HEAP
// Acquire 1000 then release 600 bytes of storage memory, leaving the
// storage memory pool at 1000 bytes but only 400 bytes of which are used.
assert(mm.acquireStorageMemory(dummyBlock, 1000L, memoryMode))
mm.releaseStorageMemory(600L, memoryMode)
assert(mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks))
mm.releaseStorageMemory(600L)
// Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of
// unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool
// by the same amount. If the eviction threw an exception, then we would shrink one pool
// without enlarging the other, resulting in an assertion failure.
intercept[RuntimeException] {
mm.acquireExecutionMemory(1000L, 0, memoryMode)
}
val assertInvariants = PrivateMethod[Unit]('assertInvariants)
mm.invokePrivate[Unit](assertInvariants())
val assertInvariant = PrivateMethod[Unit]('assertInvariant)
mm.invokePrivate[Unit](assertInvariant())
}

}