Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions core/src/main/scala/org/apache/spark/MemoryManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import scala.collection.mutable

import org.apache.spark.storage.{BlockId, BlockStatus}


/**
* An abstract memory manager that enforces how memory is shared between execution and storage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to clarify whether this is a one-per-JVM component or one-per-task component?

*
* In this context, execution memory refers to that used for computation in shuffles, joins,
* sorts and aggregations, while storage memory refers to that used for caching and propagating
* internal data across the cluster.
*/
private[spark] abstract class MemoryManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this somewhere else that's not at the top level? I'd like to minimize the number of private[spark] that appears in the top level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on moving this out of the top-level and into a new package (maybe spark.memory ?). This will also provide a convenient way to group the memory-related classes together post-refactoring.


/**
* Acquire N bytes of memory for execution.
* @return number of bytes successfully granted (<= N).
*/
def acquireExecutionMemory(numBytes: Long): Long

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return number of bytes successfully granted (0 or N).
*/
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long

/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return number of bytes successfully granted (<= N).
*/
def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long

/**
* Release N bytes of execution memory.
*/
def releaseExecutionMemory(numBytes: Long): Unit

/**
* Release N bytes of storage memory.
*/
def releaseStorageMemory(numBytes: Long): Unit

/**
* Release all storage memory acquired.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This releases all storage memory acquired JVM-wide, across all tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's called in MemoryStore#clear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. It looks like BlockStore#clear is only called during SparkContext shutdown, so I guess it technically doesn't matter if you update the memory accounting there. Doesn't hurt, I guess, but not strictly necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know, if def releaseStorageMemory(numBytes: Long) doesn't throw when releasing more than the total amount of storage memory, you could just eliminate this method and call releaseStorageMemory(Long.MAX_VALUE) instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think releasing more than you have is a bad sign. I actually log a warning for that. It's probably OK to just keep this method.

*/
def releaseStorageMemory(): Unit

/**
* Release N bytes of unroll memory.
*/
def releaseUnrollMemory(numBytes: Long): Unit

/**
* Total available memory for execution, in bytes.
*/
def maxExecutionMemory: Long

/**
* Total available memory for storage, in bytes.
*/
def maxStorageMemory: Long

/**
* Execution memory currently in use, in bytes.
*/
def executionMemoryUsed: Long

/**
* Storage memory currently in use, in bytes.
*/
def storageMemoryUsed: Long

}
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class SparkEnv (
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
// TODO: unify these *MemoryManager classes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a JIRA for this, so you could mention the number here.

val memoryManager: MemoryManager,
val shuffleMemoryManager: ShuffleMemoryManager,
val executorMemoryManager: ExecutorMemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
Expand Down Expand Up @@ -323,7 +325,8 @@ object SparkEnv extends Logging {
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)
val memoryManager = new StaticMemoryManager(conf)
val shuffleMemoryManager = ShuffleMemoryManager.create(conf, memoryManager, numUsableCores)

val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)

Expand All @@ -334,8 +337,8 @@ object SparkEnv extends Logging {

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
numUsableCores)
serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

Expand Down Expand Up @@ -407,6 +410,7 @@ object SparkEnv extends Logging {
httpFileServer,
sparkFilesDir,
metricsSystem,
memoryManager,
shuffleMemoryManager,
executorMemoryManager,
outputCommitCoordinator,
Expand Down
229 changes: 229 additions & 0 deletions core/src/main/scala/org/apache/spark/StaticMemoryManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import scala.collection.mutable

import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}


/**
* A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
*
* The sizes of the execution and storage regions are determined through
* `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two
* regions are cleanly separated such that neither usage can borrow memory from the other.
*/
private[spark] class StaticMemoryManager(
conf: SparkConf,
override val maxExecutionMemory: Long,
override val maxStorageMemory: Long)
extends MemoryManager with Logging {

// Max number of bytes worth of blocks to evict when unrolling
private val maxMemoryToEvictForUnroll: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

// Amount of execution memory in use. Accesses must be synchronized on `executionLock`.
private var _executionMemoryUsed: Long = 0
private val executionLock = new Object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: do we need two separate locks, or would using a single lock by synchronizing on this suffice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought we'd have a little more parallelism if we use two locks


// Amount of storage memory in use. Accesses must be synchronized on `storageLock`.
private var _storageMemoryUsed: Long = 0
private val storageLock = new Object

// The memory store used to evict cached blocks
private var _memoryStore: MemoryStore = _
private def memoryStore: MemoryStore = {
if (_memoryStore == null) {
_memoryStore = SparkEnv.get.blockManager.memoryStore
}
_memoryStore
}

// For testing only
def setMemoryStore(store: MemoryStore): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just accept the MemoryStore via the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialization order :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to explicitly call this method in the non-testing path as well? e.g. remove the SparkEnv.get from inside of this class and call this when wiring up the components?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, in BlockManager's constructor we can do the following:

private val memoryStore = new MemoryStore(...)
memoryManager.setMemoryStore(memoryStore)

do you think that's better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that's clearer.

_memoryStore = store
}

def this(conf: SparkConf) {
this(
conf,
StaticMemoryManager.getMaxExecutionMemory(conf),
StaticMemoryManager.getMaxStorageMemory(conf))
}

/**
* Acquire N bytes of memory for execution.
* @return number of bytes successfully granted (<= N).
*/
override def acquireExecutionMemory(numBytes: Long): Long = {
executionLock.synchronized {
assert(_executionMemoryUsed <= maxExecutionMemory)
val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
_executionMemoryUsed += bytesToGrant
bytesToGrant
}
}

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return number of bytes successfully granted (0 or N).
*/
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = {
acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks)
}

/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
*
* This evicts at most M bytes worth of existing blocks, where M is a fraction of the storage
* space specified by `spark.storage.unrollFraction`. Blocks evicted in the process, if any,
* are added to `evictedBlocks`.
*
* @return number of bytes successfully granted (0 or N).
*/
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = {
val currentUnrollMemory = memoryStore.currentUnrollMemory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any synchronization concerns here w.r.t. currentUnrollMemory? Haven't thought this through, but just wanted to ask.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's this accountingLock in MemoryStore that you always acquire before requesting any storage / unroll memory, so it should already be handled there. Maybe we should add a comment there to express this assumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in reserveUnrollMemoryForThisTask

val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}

/**
* Acquire N bytes of storage memory for the given block, evicting existing ones if necessary.
*
* @param blockId the ID of the block we are acquiring storage memory for
* @param numBytesToAcquire the size of this block
* @param numBytesToFree the size of space to be freed through evicting blocks
* @param evictedBlocks a holder for blocks evicted in the process
* @return number of bytes successfully granted (0 or N).
*/
private def acquireStorageMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = {
// Note: Keep this outside synchronized block to avoid potential deadlocks!
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
storageLock.synchronized {
assert(_storageMemoryUsed <= maxStorageMemory)
val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
val bytesToGrant = if (enoughMemory) numBytesToAcquire else 0
_storageMemoryUsed += bytesToGrant
bytesToGrant
}
}

/**
* Release N bytes of execution memory.
*/
override def releaseExecutionMemory(numBytes: Long): Unit = {
executionLock.synchronized {
if (numBytes > _executionMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of execution " +
s"memory when we only have ${_executionMemoryUsed} bytes")
_executionMemoryUsed = 0
} else {
_executionMemoryUsed -= numBytes
}
}
}

/**
* Release N bytes of storage memory.
*/
override def releaseStorageMemory(numBytes: Long): Unit = {
storageLock.synchronized {
if (numBytes > _storageMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of storage " +
s"memory when we only have ${_storageMemoryUsed} bytes")
_storageMemoryUsed = 0
} else {
_storageMemoryUsed -= numBytes
}
}
}

/**
* Release all storage memory acquired.
*/
override def releaseStorageMemory(): Unit = {
storageLock.synchronized {
_storageMemoryUsed = 0
}
}

/**
* Release N bytes of unroll memory.
*/
override def releaseUnrollMemory(numBytes: Long): Unit = {
releaseStorageMemory(numBytes)
}

/**
* Amount of execution memory currently in use, in bytes.
*/
override def executionMemoryUsed: Long = {
executionLock.synchronized {
_executionMemoryUsed
}
}

/**
* Amount of storage memory currently in use, in bytes.
*/
override def storageMemoryUsed: Long = {
storageLock.synchronized {
_storageMemoryUsed
}
}

}


private[spark] object StaticMemoryManager {

/**
* Return the total amount of memory available for the storage region, in bytes.
*/
private def getMaxStorageMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}


/**
* Return the total amount of memory available for the execution region, in bytes.
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}

}
Loading