Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Route MemoryStore through new interface
All storage memory is now acquired through the new MemoryManager
interface. This requires non-trivial refactoring due to the fact
that the unrolling and eviction logic are closely intertwined.
  • Loading branch information
Andrew Or committed Oct 5, 2015
commit 933de614df6e84d7776aa1d11ae1d82042f3f6dd
60 changes: 47 additions & 13 deletions core/src/main/scala/org/apache/spark/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark

import scala.collection.mutable

import org.apache.spark.storage.{BlockId, BlockStatus}


/**
* An abstract memory manager that enforces how memory is shared between execution and storage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to clarify whether this is a one-per-JVM component or one-per-task component?

*
Expand All @@ -27,26 +32,30 @@ package org.apache.spark
private[spark] abstract class MemoryManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this somewhere else that's not at the top level? I'd like to minimize the number of private[spark] that appears in the top level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on moving this out of the top-level and into a new package (maybe spark.memory ?). This will also provide a convenient way to group the memory-related classes together post-refactoring.


/**
* Total available memory for execution, in bytes.
*/
def maxExecutionMemory: Long

/**
* Total available memory for storage, in bytes.
* Acquire N bytes of memory for execution.
* @return number of bytes successfully granted (<= N).
*/
def maxStorageMemory: Long
def acquireExecutionMemory(numBytes: Long): Long

/**
* Acquire N bytes of memory for execution.
* @return whether the number bytes successfully granted (<= N).
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return number of bytes successfully granted (0 or N).
*/
def acquireExecutionMemory(numBytes: Long): Long
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long

/**
* Acquire N bytes of memory for storage.
* @return whether the number bytes successfully granted (<= N).
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return number of bytes successfully granted (<= N).
*/
def acquireStorageMemory(numBytes: Long): Long
def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long

/**
* Release N bytes of execution memory.
Expand All @@ -58,4 +67,29 @@ private[spark] abstract class MemoryManager {
*/
def releaseStorageMemory(numBytes: Long): Unit

/**
* Release N bytes of unroll memory.
*/
def releaseUnrollMemory(numBytes: Long): Unit

/**
* Total available memory for execution, in bytes.
*/
def maxExecutionMemory: Long

/**
* Total available memory for storage, in bytes.
*/
def maxStorageMemory: Long

/**
* Execution memory currently in use, in bytes.
*/
def executionMemoryUsed: Long

/**
* Storage memory currently in use, in bytes.
*/
def storageMemoryUsed: Long

}
163 changes: 124 additions & 39 deletions core/src/main/scala/org/apache/spark/StaticMemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark

import scala.collection.mutable

import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}


/**
* A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
Expand All @@ -25,52 +29,112 @@ package org.apache.spark
* `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two
* regions are cleanly separated such that neither usage can borrow memory from the other.
*/
private[spark] class StaticMemoryManager(conf: SparkConf = new SparkConf)
private[spark] class StaticMemoryManager(
conf: SparkConf,
override val maxExecutionMemory: Long,
override val maxStorageMemory: Long)
extends MemoryManager with Logging {

private val _maxExecutionMemory: Long = StaticMemoryManager.getMaxExecutionMemory(conf)
private val _maxStorageMemory: Long = StaticMemoryManager.getMaxStorageMemory(conf)
private val executionMemoryLock = new Object
private val storageMemoryLock = new Object
// Max number of bytes worth of blocks to evict when unrolling
private val maxMemoryToEvictForUnroll: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

// Amount of execution memory in use. Accesses must be synchronized on `executionLock`.
private var _executionMemoryUsed: Long = 0
private val executionLock = new Object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: do we need two separate locks, or would using a single lock by synchronizing on this suffice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought we'd have a little more parallelism if we use two locks


// Amount of storage memory in use. Accesses must be synchronized on `storageLock`.
private var _storageMemoryUsed: Long = 0
private val storageLock = new Object

// The memory store used to evict cached blocks
private var _memoryStore: MemoryStore = _
private def memoryStore: MemoryStore = {
if (_memoryStore == null) {
_memoryStore = SparkEnv.get.blockManager.memoryStore
}
_memoryStore
}

// All accesses must be synchronized on `executionMemoryLock`
private var executionMemoryUsed: Long = 0
// For testing only
def setMemoryStore(store: MemoryStore): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just accept the MemoryStore via the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialization order :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to explicitly call this method in the non-testing path as well? e.g. remove the SparkEnv.get from inside of this class and call this when wiring up the components?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, in BlockManager's constructor we can do the following:

private val memoryStore = new MemoryStore(...)
memoryManager.setMemoryStore(memoryStore)

do you think that's better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that's clearer.

_memoryStore = store
}

// All accesses must be synchronized on `storageMemoryLock`
private var storageMemoryUsed: Long = 0
def this(conf: SparkConf) {
this(
conf,
StaticMemoryManager.getMaxExecutionMemory(conf),
StaticMemoryManager.getMaxStorageMemory(conf))
}

/**
* Total available memory for execution, in bytes.
* Acquire N bytes of memory for execution.
* @return number of bytes successfully granted (<= N).
*/
override def maxExecutionMemory: Long = _maxExecutionMemory
override def acquireExecutionMemory(numBytes: Long): Long = {
executionLock.synchronized {
assert(_executionMemoryUsed <= maxExecutionMemory)
val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
_executionMemoryUsed += bytesToGrant
bytesToGrant
}
}

/**
* Total available memory for storage, in bytes.
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return number of bytes successfully granted (0 or N).
*/
override def maxStorageMemory: Long = _maxStorageMemory
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = {
acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks)
}

/**
* Acquire N bytes of memory for execution.
* @return whether the number bytes successfully granted (<= N).
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
*
* This evicts at most M bytes worth of existing blocks, where M is a fraction of the storage
* space specified by `spark.storage.unrollFraction`. Blocks evicted in the process, if any,
* are added to `evictedBlocks`.
*
* @return number of bytes successfully granted (0 or N).
*/
override def acquireExecutionMemory(numBytes: Long): Long = {
executionMemoryLock.synchronized {
assert(_maxExecutionMemory >= executionMemoryUsed)
val bytesToGrant = math.min(numBytes, _maxExecutionMemory - executionMemoryUsed)
executionMemoryUsed += bytesToGrant
bytesToGrant
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = {
storageLock.synchronized {
val currentUnrollMemory = memoryStore.currentUnrollMemory
val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}
}

/**
* Acquire N bytes of memory for storage.
* @return whether the number bytes successfully granted (<= N).
* Acquire N bytes of storage memory for the given block, evicting existing ones if necessary.
*
* @param blockId the ID of the block we are acquiring storage memory for
* @param numBytesToAcquire the size of this block
* @param numBytesToFree the size of space to be freed through evicting blocks
* @param evictedBlocks a holder for blocks evicted in the process
* @return number of bytes successfully granted (0 or N).
*/
override def acquireStorageMemory(numBytes: Long): Long = {
storageMemoryLock.synchronized {
assert(_maxStorageMemory >= storageMemoryUsed)
val bytesToGrant = math.min(numBytes, _maxStorageMemory - storageMemoryUsed)
storageMemoryUsed += bytesToGrant
private def acquireStorageMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = {
storageLock.synchronized {
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
assert(_storageMemoryUsed <= maxStorageMemory)
val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
val bytesToGrant = if (enoughMemory) numBytesToAcquire else 0
_storageMemoryUsed += bytesToGrant
bytesToGrant
}
}
Expand All @@ -79,13 +143,13 @@ private[spark] class StaticMemoryManager(conf: SparkConf = new SparkConf)
* Release N bytes of execution memory.
*/
override def releaseExecutionMemory(numBytes: Long): Unit = {
executionMemoryLock.synchronized {
if (numBytes > executionMemoryUsed) {
executionLock.synchronized {
if (numBytes > _executionMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of execution " +
s"memory when we only have $executionMemoryUsed bytes")
executionMemoryUsed = 0
s"memory when we only have ${_executionMemoryUsed} bytes")
_executionMemoryUsed = 0
} else {
executionMemoryUsed -= numBytes
_executionMemoryUsed -= numBytes
}
}
}
Expand All @@ -94,21 +158,42 @@ private[spark] class StaticMemoryManager(conf: SparkConf = new SparkConf)
* Release N bytes of storage memory.
*/
override def releaseStorageMemory(numBytes: Long): Unit = {
storageMemoryLock.synchronized {
if (numBytes > storageMemoryUsed) {
storageLock.synchronized {
if (numBytes > _storageMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of storage " +
s"memory when we only have $storageMemoryUsed bytes")
storageMemoryUsed = 0
s"memory when we only have ${_storageMemoryUsed} bytes")
_storageMemoryUsed = 0
} else {
storageMemoryUsed -= numBytes
_storageMemoryUsed -= numBytes
}
}
}

/**
* Release N bytes of unroll memory.
*/
override def releaseUnrollMemory(numBytes: Long): Unit = {
releaseStorageMemory(numBytes)
}

/**
* Amount of execution memory currently in use, in bytes.
*/
override def executionMemoryUsed: Long = executionLock.synchronized {
_executionMemoryUsed
}

/**
* Amount of storage memory currently in use, in bytes.
*/
override def storageMemoryUsed: Long = storageLock.synchronized {
_storageMemoryUsed
}

}


private object StaticMemoryManager {
private[spark] object StaticMemoryManager {

/**
* Return the total amount of memory available for the storage region, in bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,8 @@ private[spark] object ShuffleMemoryManager {
* Create a dummy [[ShuffleMemoryManager]] with the specified capacity and page size.
*/
def create(maxMemory: Long, pageSizeBytes: Long): ShuffleMemoryManager = {
val memoryManager = new StaticMemoryManager {
override def maxExecutionMemory: Long = maxMemory
}
val conf = new SparkConf
val memoryManager = new StaticMemoryManager(conf, maxMemory, Long.MaxValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a totally minor nit to be picking, but do you mind explicitly naming these parameters at their call sites? e.g.

new StaticMemoryManager(conf, maxExecutionMemory = maxMemory, maxStorageMemory = Long.MaxValue)

I think this makes the code slightly easier to read.

new ShuffleMemoryManager(memoryManager, pageSizeBytes)
}

Expand Down
Loading