Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Limit scope of synchronized blocks to avoid deadlocks
  • Loading branch information
Andrew Or committed Oct 6, 2015
commit e73e463ccaf8220387903b59b9a6ab71662d2ee7
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ private[spark] abstract class MemoryManager {
*/
def releaseStorageMemory(numBytes: Long): Unit

/**
* Release all storage memory acquired.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This releases all storage memory acquired JVM-wide, across all tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's called in MemoryStore#clear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. It looks like BlockStore#clear is only called during SparkContext shutdown, so I guess it technically doesn't matter if you update the memory accounting there. Doesn't hurt, I guess, but not strictly necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know, if def releaseStorageMemory(numBytes: Long) doesn't throw when releasing more than the total amount of storage memory, you could just eliminate this method and call releaseStorageMemory(Long.MAX_VALUE) instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think releasing more than you have is a bad sign. I actually log a warning for that. It's probably OK to just keep this method.

*/
def releaseStorageMemory(): Unit

/**
* Release N bytes of unroll memory.
*/
Expand Down
34 changes: 23 additions & 11 deletions core/src/main/scala/org/apache/spark/StaticMemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,10 @@ private[spark] class StaticMemoryManager(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = {
storageLock.synchronized {
val currentUnrollMemory = memoryStore.currentUnrollMemory
val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}
val currentUnrollMemory = memoryStore.currentUnrollMemory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any synchronization concerns here w.r.t. currentUnrollMemory? Haven't thought this through, but just wanted to ask.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's this accountingLock in MemoryStore that you always acquire before requesting any storage / unroll memory, so it should already be handled there. Maybe we should add a comment there to express this assumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in reserveUnrollMemoryForThisTask

val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}

/**
Expand All @@ -129,8 +127,9 @@ private[spark] class StaticMemoryManager(
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = {
// Note: Keep this outside synchronized block to avoid potential deadlocks!
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
storageLock.synchronized {
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
assert(_storageMemoryUsed <= maxStorageMemory)
val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
val bytesToGrant = if (enoughMemory) numBytesToAcquire else 0
Expand Down Expand Up @@ -169,6 +168,15 @@ private[spark] class StaticMemoryManager(
}
}

/**
* Release all storage memory acquired.
*/
override def releaseStorageMemory(): Unit = {
storageLock.synchronized {
_storageMemoryUsed = 0
}
}

/**
* Release N bytes of unroll memory.
*/
Expand All @@ -179,15 +187,19 @@ private[spark] class StaticMemoryManager(
/**
* Amount of execution memory currently in use, in bytes.
*/
override def executionMemoryUsed: Long = executionLock.synchronized {
_executionMemoryUsed
override def executionMemoryUsed: Long = {
executionLock.synchronized {
_executionMemoryUsed
}
}

/**
* Amount of storage memory currently in use, in bytes.
*/
override def storageMemoryUsed: Long = storageLock.synchronized {
_storageMemoryUsed
override def storageMemoryUsed: Long = {
storageLock.synchronized {
_storageMemoryUsed
}
}

}
Expand Down
19 changes: 9 additions & 10 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,22 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}

override def remove(blockId: BlockId): Boolean = {
entries.synchronized {
val entry = entries.remove(blockId)
if (entry != null) {
memoryManager.releaseStorageMemory(entry.size)
logDebug(s"Block $blockId of size ${entry.size} dropped " +
s"from memory (free ${maxMemory - blocksMemoryUsed})")
true
} else {
false
}
val entry = entries.synchronized { entries.remove(blockId) }
if (entry != null) {
memoryManager.releaseStorageMemory(entry.size)
logDebug(s"Block $blockId of size ${entry.size} dropped " +
s"from memory (free ${maxMemory - blocksMemoryUsed})")
true
} else {
false
}
}

override def clear() {
entries.synchronized {
entries.clear()
}
memoryManager.releaseStorageMemory()
logInfo("MemoryStore cleared")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private class GrantEverythingMemoryManager extends MemoryManager {
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes
override def releaseExecutionMemory(numBytes: Long): Unit = { }
override def releaseStorageMemory(numBytes: Long): Unit = { }
override def releaseStorageMemory(): Unit = { }
override def releaseUnrollMemory(numBytes: Long): Unit = { }
override def maxExecutionMemory: Long = Long.MaxValue
override def maxStorageMemory: Long = Long.MaxValue
Expand Down