-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-10984] Simplify *MemoryManager class structure #9127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
b7c9c23
25ba4b5
3d997ce
d9e6b84
98ef86b
8f93e94
3bbc54d
88a7970
ec48ff9
6f98bc4
6459397
60c66b2
7d6a37f
0dc21dc
f21b767
46ad693
c33e330
ef45d91
c7eac69
d86f435
bba5550
66ae259
b1d5151
d0c0dd9
48149fc
c8ba196
63a6cbc
6ec9c30
1593fad
64bec0b
f9240e9
a95bc08
b3ad761
a7e8320
e874a45
2ba6e51
0c13723
04ec429
e56d039
aa14113
7addf8b
5af0b17
a264703
f2ab708
0b5c72f
f68fdb1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,8 @@ import javax.annotation.concurrent.GuardedBy | |
| import scala.collection.mutable | ||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting | ||
|
|
||
| import org.apache.spark.{SparkException, TaskContext, SparkConf, Logging} | ||
| import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} | ||
| import org.apache.spark.unsafe.array.ByteArrayMethods | ||
|
|
@@ -41,7 +43,8 @@ import org.apache.spark.unsafe.memory.MemoryAllocator | |
| * before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the | ||
| * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever | ||
| * this set changes. This is all done by synchronizing access to mutable state and using wait() and | ||
| * notifyAll() to signal changes to callers. | ||
| * notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory across | ||
| * tasks was performed by the ShuffleMemoryManager. | ||
| */ | ||
| private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int = 1) extends Logging { | ||
| // TODO(josh) pass in numCores | ||
|
|
@@ -60,6 +63,8 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int = 1) | |
| // Amount of execution/storage memory in use, accesses must be synchronized on `this` | ||
| @GuardedBy("this") protected var _executionMemoryUsed: Long = 0 | ||
| @GuardedBy("this") protected var _storageMemoryUsed: Long = 0 | ||
| // Map from taskAttemptId -> memory consumption in bytes | ||
| @GuardedBy("this") private val memoryConsumptionForTask = new mutable.HashMap[Long, Long]() | ||
|
|
||
| /** | ||
| * Set the [[MemoryStore]] used by this manager to evict cached blocks. | ||
|
|
@@ -81,15 +86,6 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int = 1) | |
|
|
||
| // TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985) | ||
|
|
||
| /** | ||
| * Acquire N bytes of memory for execution, evicting cached blocks if necessary. | ||
| * Blocks evicted in the process, if any, are added to `evictedBlocks`. | ||
| * @return number of bytes successfully granted (<= N). | ||
| */ | ||
| def acquireExecutionMemory( | ||
| numBytes: Long, | ||
| evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long | ||
|
|
||
| /** | ||
| * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. | ||
| * Blocks evicted in the process, if any, are added to `evictedBlocks`. | ||
|
|
@@ -118,91 +114,59 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int = 1) | |
| } | ||
|
|
||
| /** | ||
| * Release N bytes of execution memory. | ||
| */ | ||
| def releaseExecutionMemory(numBytes: Long): Unit = synchronized { | ||
| if (numBytes > _executionMemoryUsed) { | ||
| logWarning(s"Attempted to release $numBytes bytes of execution " + | ||
| s"memory when we only have ${_executionMemoryUsed} bytes") | ||
| _executionMemoryUsed = 0 | ||
| } else { | ||
| _executionMemoryUsed -= numBytes | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Release N bytes of storage memory. | ||
| */ | ||
| def releaseStorageMemory(numBytes: Long): Unit = synchronized { | ||
| if (numBytes > _storageMemoryUsed) { | ||
| logWarning(s"Attempted to release $numBytes bytes of storage " + | ||
| s"memory when we only have ${_storageMemoryUsed} bytes") | ||
| _storageMemoryUsed = 0 | ||
| } else { | ||
| _storageMemoryUsed -= numBytes | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Release all storage memory acquired. | ||
| */ | ||
| def releaseAllStorageMemory(): Unit = synchronized { | ||
| _storageMemoryUsed = 0 | ||
| } | ||
|
|
||
| /** | ||
| * Release N bytes of unroll memory. | ||
| */ | ||
| def releaseUnrollMemory(numBytes: Long): Unit = synchronized { | ||
| releaseStorageMemory(numBytes) | ||
| } | ||
|
|
||
| /** | ||
| * Execution memory currently in use, in bytes. | ||
| */ | ||
| final def executionMemoryUsed: Long = synchronized { | ||
| _executionMemoryUsed | ||
| } | ||
|
|
||
| /** | ||
| * Storage memory currently in use, in bytes. | ||
| * Acquire N bytes of memory for execution, evicting cached blocks if necessary. | ||
| * Blocks evicted in the process, if any, are added to `evictedBlocks`. | ||
| * @return number of bytes successfully granted (<= N). | ||
| */ | ||
| final def storageMemoryUsed: Long = synchronized { | ||
| _storageMemoryUsed | ||
| } | ||
|
|
||
| // -- Policies for arbitrating execution memory across tasks ------------------------------------- | ||
| // Prior to Spark 1.6, these policies were implemented in the ShuffleMemoryManager. | ||
|
|
||
|
|
||
| private val taskMemory = new mutable.HashMap[Long, Long]() // taskAttemptId -> memory bytes | ||
| @VisibleForTesting | ||
| private[memory] def doAcquireExecutionMemory( | ||
| numBytes: Long, | ||
| evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long | ||
|
|
||
| /** | ||
| * Try to acquire up to numBytes memory for the current task, and return the number of bytes | ||
| * obtained, or 0 if none can be allocated. This call may block until there is enough free memory | ||
| * in some situations, to make sure each task has a chance to ramp up to at least 1 / 2N of the | ||
| * total memory pool (where N is the # of active tasks) before it is forced to spill. This can | ||
| * happen if the number of tasks increases but an older task had a lot of memory already. | ||
| * Try to acquire up to `numBytes` of execution memory for the current task and return the number | ||
| * of bytes obtained, or 0 if none can be allocated. | ||
| * | ||
| * This call may block until there is enough free memory in some situations, to make sure each | ||
| * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of | ||
| * active tasks) before it is forced to spill. This can happen if the number of tasks increase | ||
| * but an older task had a lot of memory already. | ||
| */ | ||
| def tryToAcquire(numBytes: Long, taskAttemptId: Long): Long = synchronized { | ||
| def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long): Long = synchronized { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then maybe we should add that "subclasses should override
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
| assert(numBytes > 0, "invalid number of bytes requested: " + numBytes) | ||
|
|
||
| // Add this task to the taskMemory map just so we can keep an accurate count of the number | ||
| // of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire | ||
| if (!taskMemory.contains(taskAttemptId)) { | ||
| taskMemory(taskAttemptId) = 0L | ||
| if (!memoryConsumptionForTask.contains(taskAttemptId)) { | ||
| memoryConsumptionForTask(taskAttemptId) = 0L | ||
| // This will later cause waiting tasks to wake up and check numTasks again | ||
| notifyAll() | ||
| } | ||
|
|
||
| // Once the cross-task memory allocation policy has decided to grant more memory to a task, | ||
| // this method is called in order to actually obtain that execution memory, potentially | ||
| // triggering eviction of storage memory: | ||
| def acquire(toGrant: Long): Long = synchronized { | ||
| val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] | ||
| val acquired = doAcquireExecutionMemory(toGrant, evictedBlocks) | ||
| // Register evicted blocks, if any, with the active task metrics | ||
| Option(TaskContext.get()).foreach { tc => | ||
| val metrics = tc.taskMetrics() | ||
| val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) | ||
| metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq) | ||
| } | ||
| memoryConsumptionForTask(taskAttemptId) += acquired | ||
| acquired | ||
| } | ||
|
|
||
| // Keep looping until we're either sure that we don't want to grant this request (because this | ||
| // task would have more than 1 / numActiveTasks of the memory) or we have enough free | ||
| // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)). | ||
| // TODO: simplify this to limit each task to its own slot | ||
| while (true) { | ||
| val numActiveTasks = taskMemory.keys.size | ||
| val curMem = taskMemory(taskAttemptId) | ||
| val freeMemory = maxExecutionMemory - taskMemory.values.sum | ||
| val numActiveTasks = memoryConsumptionForTask.keys.size | ||
| val curMem = memoryConsumptionForTask(taskAttemptId) | ||
| val freeMemory = maxExecutionMemory - memoryConsumptionForTask.values.sum | ||
|
|
||
| // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks; | ||
| // don't let it be negative | ||
|
|
@@ -217,62 +181,95 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int = 1) | |
| // (this happens if older tasks allocated lots of memory before N grew) | ||
| if ( | ||
| freeMemory >= math.min(maxToGrant, maxExecutionMemory / (2 * numActiveTasks) - curMem)) { | ||
| return acquire(toGrant, taskAttemptId) | ||
| return acquire(toGrant) | ||
| } else { | ||
| logInfo( | ||
| s"TID $taskAttemptId waiting for at least 1/2N of shuffle memory pool to be free") | ||
| s"TID $taskAttemptId waiting for at least 1/2N of execution memory pool to be free") | ||
| wait() | ||
| } | ||
| } else { | ||
| return acquire(toGrant, taskAttemptId) | ||
| return acquire(toGrant) | ||
| } | ||
| } | ||
| 0L // Never reached | ||
| } | ||
|
|
||
| /** | ||
| * Acquire N bytes of execution memory from the memory manager for the current task. | ||
| * @return number of bytes actually acquired (<= N). | ||
| */ | ||
| private def acquire(numBytes: Long, taskAttemptId: Long): Long = synchronized { | ||
| val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] | ||
| val acquired = acquireExecutionMemory(numBytes, evictedBlocks) | ||
| // Register evicted blocks, if any, with the active task metrics | ||
| // TODO: just do this in `acquireExecutionMemory` (SPARK-10985) | ||
| Option(TaskContext.get()).foreach { tc => | ||
| val metrics = tc.taskMetrics() | ||
| val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) | ||
| metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq) | ||
| @VisibleForTesting | ||
| private[memory] def releaseExecutionMemory(numBytes: Long): Unit = synchronized { | ||
| if (numBytes > _executionMemoryUsed) { | ||
| logWarning(s"Attempted to release $numBytes bytes of execution " + | ||
| s"memory when we only have ${_executionMemoryUsed} bytes") | ||
| _executionMemoryUsed = 0 | ||
| } else { | ||
| _executionMemoryUsed -= numBytes | ||
| } | ||
| taskMemory(taskAttemptId) += acquired | ||
| acquired | ||
| } | ||
|
|
||
| /** Release numBytes bytes for the current task. */ | ||
| def release(numBytes: Long, taskAttemptId: Long): Unit = synchronized { | ||
| val curMem = taskMemory.getOrElse(taskAttemptId, 0L) | ||
| if (curMem < numBytes) { | ||
| /** | ||
| * Release numBytes of execution memory belonging to the given task. | ||
| */ | ||
| def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long): Unit = synchronized { | ||
| val curMem = memoryConsumptionForTask.getOrElse(taskAttemptId, 0L) | ||
| if (curMem < numBytes && taskAttemptId != -1) { // -1 is a dummy id used in some tests | ||
| throw new SparkException( | ||
| s"Internal error: release called on $numBytes bytes but task only has $curMem") | ||
| } | ||
| if (taskMemory.contains(taskAttemptId)) { | ||
| taskMemory(taskAttemptId) -= numBytes | ||
| if (memoryConsumptionForTask.contains(taskAttemptId)) { | ||
| memoryConsumptionForTask(taskAttemptId) -= numBytes | ||
| releaseExecutionMemory(numBytes) | ||
| } | ||
| notifyAll() // Notify waiters in tryToAcquire that memory has been freed | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. outdated
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
| } | ||
|
|
||
| /** Release all memory for the current task and mark it as inactive (e.g. when a task ends). */ | ||
| private[memory] def releaseMemoryForTask(taskAttemptId: Long): Unit = synchronized { | ||
| taskMemory.remove(taskAttemptId).foreach { numBytes => | ||
| releaseExecutionMemory(numBytes) | ||
| /** Release all memory for the given task and mark it as inactive (e.g. when a task ends). */ | ||
| private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Unit = synchronized { | ||
| releaseExecutionMemory(getMemoryConsumptionForTask(taskAttemptId), taskAttemptId) | ||
| } | ||
|
|
||
| /** | ||
| * Release N bytes of storage memory. | ||
| */ | ||
| def releaseStorageMemory(numBytes: Long): Unit = synchronized { | ||
| if (numBytes > _storageMemoryUsed) { | ||
| logWarning(s"Attempted to release $numBytes bytes of storage " + | ||
| s"memory when we only have ${_storageMemoryUsed} bytes") | ||
| _storageMemoryUsed = 0 | ||
| } else { | ||
| _storageMemoryUsed -= numBytes | ||
| } | ||
| notifyAll() // Notify waiters in tryToAcquire that memory has been freed | ||
| } | ||
|
|
||
| /** | ||
| * Release all storage memory acquired. | ||
| */ | ||
| def releaseAllStorageMemory(): Unit = synchronized { | ||
| _storageMemoryUsed = 0 | ||
| } | ||
|
|
||
| /** | ||
| * Release N bytes of unroll memory. | ||
| */ | ||
| def releaseUnrollMemory(numBytes: Long): Unit = synchronized { | ||
| releaseStorageMemory(numBytes) | ||
| } | ||
|
|
||
| /** | ||
| * Execution memory currently in use, in bytes. | ||
| */ | ||
| final def executionMemoryUsed: Long = synchronized { | ||
| _executionMemoryUsed | ||
| } | ||
|
|
||
| /** | ||
| * Storage memory currently in use, in bytes. | ||
| */ | ||
| final def storageMemoryUsed: Long = synchronized { | ||
| _storageMemoryUsed | ||
| } | ||
|
|
||
| /** Returns the memory consumption, in bytes, for the current task */ | ||
| private[memory] def getMemoryConsumptionForTask(taskAttemptId: Long): Long = synchronized { | ||
| taskMemory.getOrElse(taskAttemptId, 0L) | ||
| memoryConsumptionForTask.getOrElse(taskAttemptId, 0L) | ||
| } | ||
|
|
||
| // -- Methods related to Tungsten managed memory ------------------------------------------------- | ||
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executionMemoryForTask?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.