Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Pending unroll transfer.
  • Loading branch information
JoshRosen committed Oct 8, 2015
commit b1e8fd16d9f08ff2ff693c44b09143c810c05544
29 changes: 14 additions & 15 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -303,22 +303,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
// Otherwise, if we return an iterator, we release the memory reserved here
// later when the task finishes.
if (keepUnrolling) {
val taskAttemptId = currentTaskAttemptId()
accountingLock.synchronized {
// Here, we are logically transferring memory from unroll memory to pending unroll memory.
// We release and re-acquire the memory from the MemoryManager. As of today, this is not
// race-prone because all calls to [acquire|release]UnrollMemoryForThisTask() occur in
// MemoryStore and are guarded by `accountingLock`, MemoryStore is the only component
// which allocates storage memory, and unroll memory is currently counted towards
// storage memory. If we ever change things so that unroll memory is counted towards
// execution memory, then we will need to revisit this argument as it may no longer hold.
// TODO: revisit this as part of SPARK-10983.
val amountToRelease = currentUnrollMemoryForThisTask - previousMemoryReserved
releaseUnrollMemoryForThisTask(amountToRelease)
val acquired = memoryManager.acquireUnrollMemory(blockId, amountToRelease, droppedBlocks)
assert(acquired == amountToRelease)
val taskAttemptId = currentTaskAttemptId()
// Here, we transfer memory from unroll to pending unroll because we expect to cache this
// block in `tryToPut`. We do not release and re-acquire memory from the MemoryManager in
// order to avoid race conditions where another component steals the memory that we're
// trying to transfer.
val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
pendingUnrollMemoryMap(taskAttemptId) =
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToRelease
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending
}
}
}
Expand Down Expand Up @@ -374,7 +368,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
// Note: if we have previously unrolled this block successfully, then pending unroll
// memory should be non-zero. This is the amount that we already reserved during the
// unrolling process. In this case, we can just reuse this space to cache our block.
// This must be synchronized so the release and re-acquire can happen atomically.
//
// Note: the StaticMemoryManager counts unroll memory as storage memory. Here, the
// synchronization on `accountingLock` guarantees that the release of unroll memory and
// acquisition of storage memory happens atomically. However, if storage memory is acquired
// outside of MemoryStore or if unroll memory is counted as execution memory, then we will
// have to revisit this assumption. See SPARK-10983 for more context.
releasePendingUnrollMemoryForThisTask()
val numBytesAcquired = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
val enoughMemory = numBytesAcquired == size
Expand Down